Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ jobs:
export MAVEN_CLI_OPTS="--no-transfer-progress"
export JAVA_VERSION=${{ matrix.java }}
# It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414.
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
rm -rf ~/.m2/repository/org/apache/spark

scala-213:
Expand Down Expand Up @@ -660,7 +660,7 @@ jobs:
- name: Build with SBT
run: |
./dev/change-scala-version.sh 2.13
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile
./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile

tpcds-1g:
needs: [configure-jobs, precondition]
Expand Down
2 changes: 1 addition & 1 deletion dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"}
SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano"}

# NOTE: echo "q" is needed because SBT prompts the user for input on encountering a build file
# with failure (either resolution or compilation); the "q" makes SBT quit.
Expand Down
12 changes: 12 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ object SparkBuild extends PomBuild {
// SPARK-14738 - Remove docker tests from main Spark build
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)

if (!profiles.contains("volcano")) {
enable(Volcano.settings)(kubernetes)
enable(Volcano.settings)(kubernetesIntegrationTests)
}

enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)

enable(YARN.settings)(yarn)
Expand Down Expand Up @@ -956,6 +961,13 @@ object SparkR {
)
}

object Volcano {
// Exclude all volcano file for Compile and Test
lazy val settings = Seq(
unmanagedSources / excludeFilter := HiddenFileFilter || "*Volcano*.scala"
)
}

object Unidoc {

import BuildCommons._
Expand Down
30 changes: 30 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,25 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<volcano.exclude>**/*Volcano*.scala</volcano.exclude>
</properties>

<profiles>
<profile>
<id>volcano</id>
<properties>
<volcano.exclude></volcano.exclude>
</properties>
<dependencies>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>volcano-model-v1beta1</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -103,6 +120,19 @@


<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>${volcano.exclude}</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model._
import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}

private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
with KubernetesExecutorCustomFeatureConfigStep {

private var kubernetesConf: KubernetesConf = _

private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"

private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
private lazy val namespace = kubernetesConf.namespace

override def init(config: KubernetesDriverConf): Unit = {
kubernetesConf = config
}

override def init(config: KubernetesExecutorConf): Unit = {
kubernetesConf = config
}

override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
val podGroup = new PodGroupBuilder()
.editOrNewMetadata()
.withName(podGroupName)
.withNamespace(namespace)
.endMetadata()
.build()
Seq(podGroup)
}

override def configurePod(pod: SparkPod): SparkPod = {
val k8sPodBuilder = new PodBuilder(pod.pod)
.editMetadata()
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
.endMetadata()
val k8sPod = k8sPodBuilder.build()
SparkPod(k8sPod, pod.container)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.volcano.scheduling.v1beta1.PodGroup

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._

class VolcanoFeatureStepSuite extends SparkFunSuite {

test("SPARK-36061: Driver Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
val step = new VolcanoFeatureStep()
step.init(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

val annotations = configuredPod.pod.getMetadata.getAnnotations

assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
}

test("SPARK-36061: Executor Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
val step = new VolcanoFeatureStep()
step.init(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
val annotations = configuredPod.pod.getMetadata.getAnnotations
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
}
}
33 changes: 33 additions & 0 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

<test.exclude.tags></test.exclude.tags>
<test.include.tags></test.include.tags>
<volcano.exclude>**/*Volcano*.scala</volcano.exclude>
</properties>
<packaging>jar</packaging>
<name>Spark Project Kubernetes Integration Tests</name>
Expand Down Expand Up @@ -74,9 +75,28 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>${volcano.exclude}</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -209,5 +229,18 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>volcano</id>
<properties>
<volcano.exclude></volcano.exclude>
</properties>
<dependencies>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>volcano-client</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class KubernetesSuite extends SparkFunSuite
}
}

before {
protected def setUpTest(): Unit = {
appLocator = UUID.randomUUID().toString.replaceAll("-", "")
driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "")
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
Expand All @@ -195,6 +195,10 @@ class KubernetesSuite extends SparkFunSuite
}
}

before {
setUpTest()
}

after {
if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
kubernetesTestComponents.deleteNamespace()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import org.scalatest.Tag

class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite {

override protected def setUpTest(): Unit = {
super.setUpTest()
sparkAppConf
.set("spark.kubernetes.driver.scheduler.name", "volcano")
.set("spark.kubernetes.executor.scheduler.name", "volcano")
}
}

private[spark] object VolcanoSuite {
val volcanoTag = Tag("volcano")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.volcano.client.VolcanoClient

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag

private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
import VolcanoTestsSuite._

protected def checkScheduler(pod: Pod): Unit = {
assert(pod.getSpec.getSchedulerName === "volcano")
}

protected def checkAnnotaion(pod: Pod): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val annotations = pod.getMetadata.getAnnotations
assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
}

protected def checkPodGroup(pod: Pod): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val podGroupName = s"$appId-podgroup"
val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
}

test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
sparkAppConf
.set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP)
.set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP)
runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkScheduler(driverPod)
checkAnnotaion(driverPod)
checkPodGroup(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkScheduler(executorPod)
checkAnnotaion(executorPod)
}
)
}
}

private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
}