Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ jobs:
export MAVEN_CLI_OPTS="--no-transfer-progress"
export JAVA_VERSION=${{ matrix.java }}
# It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414.
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Pyunikorn -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
rm -rf ~/.m2/repository/org/apache/spark

scala-213:
Expand Down Expand Up @@ -707,7 +707,7 @@ jobs:
- name: Build with SBT
run: |
./dev/change-scala-version.sh 2.13
./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile Test/compile
./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Pyunikorn -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile Test/compile

# Any TPC-DS related updates on this job need to be applied to tpcds-1g-gen job of benchmark.yml as well
tpcds-1g:
Expand Down
2 changes: 1 addition & 1 deletion dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano"}
SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano -Pyunikorn"}

# NOTE: echo "q" is needed because SBT prompts the user for input on encountering a build file
# with failure (either resolution or compilation); the "q" makes SBT quit.
Expand Down
12 changes: 12 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ object SparkBuild extends PomBuild {
enable(Volcano.settings)(kubernetesIntegrationTests)
}

if (!profiles.contains("yunikorn")) {
enable(YuniKorn.settings)(kubernetes)
enable(YuniKorn.settings)(kubernetesIntegrationTests)
}

enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)

enable(YARN.settings)(yarn)
Expand Down Expand Up @@ -973,6 +978,13 @@ object Volcano {
)
}

object YuniKorn {
// Exclude all yunikorn file for Compile and Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. file -> files. Last time, we missed this at Volcano PR.

lazy val settings = Seq(
unmanagedSources / excludeFilter ~= { _ || "*YuniKorn*.scala" }
Copy link
Member

@Yikun Yikun Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[1] https://groups.google.com/g/simple-build-tool/c/zVMyoWRAVWg?hl=en
[2] https://stackoverflow.com/questions/41627627/sbt-0-13-8-what-does-the-settingkey-method-do

Note for myself and othre reviewers. : ) but looks like we couldn't find any offcial sbt doc

)
}

object Unidoc {

import BuildCommons._
Expand Down
8 changes: 8 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<volcano.exclude>**/*Volcano*.scala</volcano.exclude>
<yunikorn.exclude>**/*YuniKorn*.scala</yunikorn.exclude>
</properties>

<profiles>
Expand All @@ -51,6 +52,12 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>yunikorn</id>
<properties>
<yunikorn.exclude></yunikorn.exclude>
</properties>
</profile>
</profiles>

<dependencies>
Expand Down Expand Up @@ -134,6 +141,7 @@
<configuration>
<excludes>
<exclude>${volcano.exclude}</exclude>
<exclude>${yunikorn.exclude}</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}

private[spark] class YuniKornFeatureStep extends KubernetesDriverCustomFeatureConfigStep
with KubernetesExecutorCustomFeatureConfigStep {

private var kubernetesConf: KubernetesConf = _

override def init(config: KubernetesDriverConf): Unit = {
kubernetesConf = config
}

override def init(config: KubernetesExecutorConf): Unit = {
kubernetesConf = config
}

override def configurePod(pod: SparkPod): SparkPod = {
val k8sPodBuilder = new PodBuilder(pod.pod)
.editMetadata()
.addToAnnotations(YuniKornFeatureStep.AppIdAnnotationKey, kubernetesConf.appId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the one part that we can't do by just setting the scheduler name right now correct? Are there any other parts that you think we need to add to the featurestep in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This is the one part needed to get this working for yunikorn. Except for this one, there is another one needed: https://issues.apache.org/jira/browse/SPARK-38310, I will be working on it once this one gets merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Apache Spark 3.3.0, SPARK-38383 supports APP_ID and EXECUTOR_ID placeholder in annotations. Do we need this specialized logic still?

.endMetadata()
val k8sPod = k8sPodBuilder.build()
SparkPod(k8sPod, pod.container)
}
}

object YuniKornFeatureStep {
val AppIdAnnotationKey = "yunikorn.apache.org/app-id"
val SchedulerName = "yunikorn"
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Spark naming scheme use all CAPITAL for constants.

- val AppIdAnnotationKey = "yunikorn.apache.org/app-id"
- val SchedulerName = "yunikorn"
+ val APP_ID_ANNOTATION_KEY = "yunikorn.apache.org/app-id"
+ val SCHEDULER_NAME = "yunikorn"

BTW, you are already using this style in this PR in the following.

private[spark] object YuniKornTestsSuite extends SparkFunSuite {
   val YUNIKORN_FEATURE_STEP = classOf[YuniKornFeatureStep].getName
 }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._

class YuniKornFeatureStepSuite extends SparkFunSuite {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR seems to duplicate the existing test coverage. Could you remove this test suite?

.set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}")

.set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}")


test("SPARK-37809: Driver Pod with YuniKorn annotations") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these tests also use yunikornTag ?

val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
val step = new YuniKornFeatureStep()
step.init(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
val annotations = configuredPod.pod.getMetadata.getAnnotations
assert(annotations.get(YuniKornFeatureStep.AppIdAnnotationKey) === kubernetesConf.appId)
}

test("SPARK-37809: Executor Pod with YuniKorn annotations") {
val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
val step = new YuniKornFeatureStep()
step.init(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
val annotations = configuredPod.pod.getMetadata.getAnnotations
assert(annotations.get(YuniKornFeatureStep.AppIdAnnotationKey) === kubernetesConf.appId)
}
}
8 changes: 8 additions & 0 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<test.exclude.tags></test.exclude.tags>
<test.include.tags></test.include.tags>
<volcano.exclude>**/*Volcano*.scala</volcano.exclude>
<yunikorn.exclude>**/*YuniKorn*.scala</yunikorn.exclude>
</properties>
<packaging>jar</packaging>
<name>Spark Project Kubernetes Integration Tests</name>
Expand Down Expand Up @@ -92,6 +93,7 @@
<configuration>
<excludes>
<exclude>${volcano.exclude}</exclude>
<exclude>${yunikorn.exclude}</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -242,5 +244,11 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>yunikorn</id>
<properties>
<yunikorn.exclude></yunikorn.exclude>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import org.scalatest.Tag

import org.apache.spark.deploy.k8s.features.YuniKornFeatureStep

class YuniKornSuite extends KubernetesSuite with YuniKornTestsSuite {

override protected def setUpTest(): Unit = {
super.setUpTest()
sparkAppConf
.set("spark.kubernetes.scheduler.name", YuniKornFeatureStep.SchedulerName)
}
}

private[spark] object YuniKornSuite {
val yunikornTag = Tag("yunikorn")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import io.fabric8.kubernetes.api.model.Pod

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.features.YuniKornFeatureStep
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
import org.apache.spark.deploy.k8s.integrationtest.YuniKornSuite.yunikornTag

private[spark] trait YuniKornTestsSuite { k8sSuite: KubernetesSuite =>
import YuniKornTestsSuite._

protected def checkScheduler(pod: Pod): Unit = {
assert(pod.getSpec.getSchedulerName === YuniKornFeatureStep.SchedulerName)
}

protected def checkAnnotations(pod: Pod): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val annotations = pod.getMetadata.getAnnotations
assert(annotations.get(YuniKornFeatureStep.AppIdAnnotationKey) === appId)
}

test("Run SparkPi with yunikorn scheduler", k8sTestTag, yunikornTag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the actual YuniKorn feature test coverage here. Otherwise, we cannot prevent the future regression. Technically, I don't think YuniKorn has less features than Volcano scheduler. At least, we need similar test coverage with Volcano and it would be great if we can have more test coverage about YuniKorn's specialty. FYI, we have at least the following test coverage at least at Volcano.

test("SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled)", k8sTestTag, volcanoTag) {

test("SPARK-38188: Run SparkPi jobs with 2 queues (all enabled)", k8sTestTag, volcanoTag) {

test("SPARK-38423: Run driver job to validate priority order", k8sTestTag, volcanoTag) {

sparkAppConf
.set("spark.kubernetes.driver.pod.featureSteps", YUNIKORN_FEATURE_STEP)
.set("spark.kubernetes.executor.pod.featureSteps", YUNIKORN_FEATURE_STEP)
runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkScheduler(driverPod)
checkAnnotations(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkScheduler(executorPod)
checkAnnotations(executorPod)
}
)
}
}

private[spark] object YuniKornTestsSuite extends SparkFunSuite {
val YUNIKORN_FEATURE_STEP = classOf[YuniKornFeatureStep].getName
}