diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 3a9157010e9b..53b382e6969f 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -661,7 +661,7 @@ jobs: export MAVEN_CLI_OPTS="--no-transfer-progress" export JAVA_VERSION=${{ matrix.java }} # It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414. - ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install + ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Pyunikorn -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install rm -rf ~/.m2/repository/org/apache/spark scala-213: @@ -707,7 +707,7 @@ jobs: - name: Build with SBT run: | ./dev/change-scala-version.sh 2.13 - ./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile Test/compile + ./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Pyunikorn -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile Test/compile # Any TPC-DS related updates on this job need to be applied to tpcds-1g-gen job of benchmark.yml as well tpcds-1g: diff --git a/dev/scalastyle b/dev/scalastyle index 5f958b8fb0a7..374173146ef1 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,7 +17,7 @@ # limitations under the License. # -SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano"} +SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano -Pyunikorn"} # NOTE: echo "q" is needed because SBT prompts the user for input on encountering a build file # with failure (either resolution or compilation); the "q" makes SBT quit. diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 21ab6f9f636a..dd2812ad44c0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -426,6 +426,11 @@ object SparkBuild extends PomBuild { enable(Volcano.settings)(kubernetesIntegrationTests) } + if (!profiles.contains("yunikorn")) { + enable(YuniKorn.settings)(kubernetes) + enable(YuniKorn.settings)(kubernetesIntegrationTests) + } + enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) enable(YARN.settings)(yarn) @@ -973,6 +978,13 @@ object Volcano { ) } +object YuniKorn { + // Exclude all yunikorn file for Compile and Test + lazy val settings = Seq( + unmanagedSources / excludeFilter ~= { _ || "*YuniKorn*.scala" } + ) +} + object Unidoc { import BuildCommons._ diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 1c729cc441ee..4dc0cf9cdba3 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -30,6 +30,7 @@ kubernetes **/*Volcano*.scala + **/*YuniKorn*.scala @@ -51,6 +52,12 @@ + + yunikorn + + + + @@ -134,6 +141,7 @@ ${volcano.exclude} + ${yunikorn.exclude} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/YuniKornFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/YuniKornFeatureStep.scala new file mode 100644 index 000000000000..61efecf0c6fc --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/YuniKornFeatureStep.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod} + +private[spark] class YuniKornFeatureStep extends KubernetesDriverCustomFeatureConfigStep + with KubernetesExecutorCustomFeatureConfigStep { + + private var kubernetesConf: KubernetesConf = _ + + override def init(config: KubernetesDriverConf): Unit = { + kubernetesConf = config + } + + override def init(config: KubernetesExecutorConf): Unit = { + kubernetesConf = config + } + + override def configurePod(pod: SparkPod): SparkPod = { + val k8sPodBuilder = new PodBuilder(pod.pod) + .editMetadata() + .addToAnnotations(YuniKornFeatureStep.AppIdAnnotationKey, kubernetesConf.appId) + .endMetadata() + val k8sPod = k8sPodBuilder.build() + SparkPod(k8sPod, pod.container) + } +} + +object YuniKornFeatureStep { + val AppIdAnnotationKey = "yunikorn.apache.org/app-id" + val SchedulerName = "yunikorn" +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/YuniKornFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/YuniKornFeatureStepSuite.scala new file mode 100644 index 000000000000..7e8de76f21a8 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/YuniKornFeatureStepSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ + +class YuniKornFeatureStepSuite extends SparkFunSuite { + + test("SPARK-37809: Driver Pod with YuniKorn annotations") { + val sparkConf = new SparkConf() + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf) + val step = new YuniKornFeatureStep() + step.init(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + val annotations = configuredPod.pod.getMetadata.getAnnotations + assert(annotations.get(YuniKornFeatureStep.AppIdAnnotationKey) === kubernetesConf.appId) + } + + test("SPARK-37809: Executor Pod with YuniKorn annotations") { + val sparkConf = new SparkConf() + val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf) + val step = new YuniKornFeatureStep() + step.init(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + val annotations = configuredPod.pod.getMetadata.getAnnotations + assert(annotations.get(YuniKornFeatureStep.AppIdAnnotationKey) === kubernetesConf.appId) + } +} diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index e468523d254b..f52ab9c18a53 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -48,6 +48,7 @@ **/*Volcano*.scala + **/*YuniKorn*.scala jar Spark Project Kubernetes Integration Tests @@ -92,6 +93,7 @@ ${volcano.exclude} + ${yunikorn.exclude} @@ -242,5 +244,11 @@ + + yunikorn + + + + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/YuniKornSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/YuniKornSuite.scala new file mode 100644 index 000000000000..795c1e2429df --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/YuniKornSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.scalatest.Tag + +import org.apache.spark.deploy.k8s.features.YuniKornFeatureStep + +class YuniKornSuite extends KubernetesSuite with YuniKornTestsSuite { + + override protected def setUpTest(): Unit = { + super.setUpTest() + sparkAppConf + .set("spark.kubernetes.scheduler.name", YuniKornFeatureStep.SchedulerName) + } +} + +private[spark] object YuniKornSuite { + val yunikornTag = Tag("yunikorn") +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/YuniKornTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/YuniKornTestsSuite.scala new file mode 100644 index 000000000000..bff2d583abb5 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/YuniKornTestsSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.features.YuniKornFeatureStep +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag +import org.apache.spark.deploy.k8s.integrationtest.YuniKornSuite.yunikornTag + +private[spark] trait YuniKornTestsSuite { k8sSuite: KubernetesSuite => + import YuniKornTestsSuite._ + + protected def checkScheduler(pod: Pod): Unit = { + assert(pod.getSpec.getSchedulerName === YuniKornFeatureStep.SchedulerName) + } + + protected def checkAnnotations(pod: Pod): Unit = { + val appId = pod.getMetadata.getLabels.get("spark-app-selector") + val annotations = pod.getMetadata.getAnnotations + assert(annotations.get(YuniKornFeatureStep.AppIdAnnotationKey) === appId) + } + + test("Run SparkPi with yunikorn scheduler", k8sTestTag, yunikornTag) { + sparkAppConf + .set("spark.kubernetes.driver.pod.featureSteps", YUNIKORN_FEATURE_STEP) + .set("spark.kubernetes.executor.pod.featureSteps", YUNIKORN_FEATURE_STEP) + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkScheduler(driverPod) + checkAnnotations(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkScheduler(executorPod) + checkAnnotations(executorPod) + } + ) + } +} + +private[spark] object YuniKornTestsSuite extends SparkFunSuite { + val YUNIKORN_FEATURE_STEP = classOf[YuniKornFeatureStep].getName +}