From 10224353845298896b603cd2ccd3018d481d6e96 Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Fri, 28 Jan 2022 17:05:02 +0800 Subject: [PATCH 1/5] Add Volcano feature step --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 + dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 + resource-managers/kubernetes/core/pom.xml | 12 +++ .../org/apache/spark/deploy/k8s/Config.scala | 15 ++++ .../scheduler/VolcanoFeatureStep.scala | 75 +++++++++++++++++++ .../spark/deploy/k8s/PodBuilderSuite.scala | 13 ++++ .../scheduler/VolcanoFeatureStepSuite.scala | 52 +++++++++++++ 7 files changed, 171 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 828423790476..11b6fc73eb3d 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -260,6 +260,8 @@ tink/1.6.0//tink-1.6.0.jar transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar velocity/1.5//velocity-1.5.jar +volcano-client/5.12.0//volcano-client-5.12.0.jar +volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar xercesImpl/2.12.0//xercesImpl-2.12.0.jar xml-apis/1.4.01//xml-apis-1.4.01.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index f1692777dadb..907a07a40157 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -246,6 +246,8 @@ tink/1.6.0//tink-1.6.0.jar transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar velocity/1.5//velocity-1.5.jar +volcano-client/5.12.0//volcano-client-5.12.0.jar +volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar wildfly-openssl/1.0.7.Final//wildfly-openssl-1.0.7.Final.jar xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar xz/1.8//xz-1.8.jar diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 0cb5e115906a..3de399ea701b 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -52,6 +52,18 @@ test-jar + + io.fabric8 + volcano-client + ${kubernetes-client.version} + + + + io.fabric8 + volcano-model-v1beta1 + ${kubernetes-client.version} + + io.fabric8 kubernetes-client diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 65a8f8269966..ddcaeb0c4b43 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -23,6 +23,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit private[spark] object Config extends Logging { @@ -675,6 +676,20 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) + val KUBERNETES_JOB_MIN_MEMORY = ConfigBuilder("spark.kubernetes.job.min.memory") + .doc("The minimum memory for running the job, in MiB unless otherwise specified. This only " + + "applicable when you enable `VolcanoFeatureStep` feature step in" + + " `spark.kubernetes.driver.pod.featureSteps`.") + .version("3.3.0") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("3g") + val KUBERNETES_JOB_MIN_CPU = ConfigBuilder("spark.kubernetes.job.min.cpu") + .doc("The minimum CPU for running the job. This only applicable when you enable " + + "`VolcanoFeatureStep` feature step in `spark.kubernetes.driver.pod.featureSteps`.") + .version("3.3.0") + .doubleConf + .createWithDefault(2.0) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala new file mode 100644 index 000000000000..b621647aa4dc --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.scheduler + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep} + +private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep + with KubernetesExecutorCustomFeatureConfigStep { + + private var kubernetesConf: KubernetesConf = _ + + private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name" + + private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup" + private lazy val minCpu = kubernetesConf.get(KUBERNETES_JOB_MIN_CPU) + private lazy val minMemory = kubernetesConf.get(KUBERNETES_JOB_MIN_MEMORY) + private lazy val namespace = kubernetesConf.namespace + + override def init(config: KubernetesDriverConf): Unit = { + kubernetesConf = config + } + + override def init(config: KubernetesExecutorConf): Unit = { + kubernetesConf = config + } + + override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = { + val cpuQ = new QuantityBuilder(false) + .withAmount(s"${minCpu}") + .build() + val memoryQ = new QuantityBuilder(false) + .withAmount(s"${minMemory}Mi") + .build() + val podGroup = new PodGroupBuilder() + .editOrNewMetadata() + .withName(podGroupName) + .withNamespace(namespace) + .endMetadata() + .editOrNewSpec() + .withMinResources(Map("cpu" -> cpuQ, "memory" -> memoryQ).asJava) + .endSpec() + .build() + Seq(podGroup) + } + + override def configurePod(pod: SparkPod): SparkPod = { + val k8sPodBuilder = new PodBuilder(pod.pod) + .editMetadata() + .addToAnnotations(POD_GROUP_ANNOTATION, podGroupName) + .endMetadata() + val k8sPod = k8sPodBuilder.build() + SparkPod(k8sPod, pod.container) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index c076f22c7b14..5d65fd73031f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -124,6 +124,19 @@ abstract class PodBuilderSuite extends SparkFunSuite { assert(e.getMessage.contains("unknow.class")) } + test("SPARK-36061: configure volcano feature step") { + val client = mockKubernetesClient() + val sparkConf = baseConf.clone() + .set(userFeatureStepsConf.key, + "org.apache.spark.deploy.k8s.features.scheduler.VolcanoFeatureStep") + .set(templateFileConf.key, "template-file.yaml") + val pod = buildPod(sparkConf, client) + verifyPod(pod) + val annotation = pod.pod.getMetadata.getAnnotations + assert(annotation.get("scheduling.k8s.io/group-name") === + s"${KubernetesTestConf.APP_ID}-podgroup") + } + test("complain about misconfigured pod template") { val client = mockKubernetesClient( new PodBuilder() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala new file mode 100644 index 000000000000..dbe3e719885e --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.scheduler + +import io.fabric8.kubernetes.api.model.Quantity +import io.fabric8.volcano.scheduling.v1beta1.PodGroup + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ + +class VolcanoFeatureStepSuite extends SparkFunSuite { + + test("SPARK-36061: Driver Pod with Volcano PodGroup") { + val sparkConf = new SparkConf() + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf) + val step = new VolcanoFeatureStep() + step.init(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + val annotations = configuredPod.pod.getMetadata.getAnnotations + + assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup") + val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup] + assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup") + assert(podGroup.getSpec.getMinResources.get("cpu") === new Quantity("2.0")) + assert(podGroup.getSpec.getMinResources.get("memory") === new Quantity("3072")) + } + + test("SPARK-36061: Executor Pod with Volcano PodGroup") { + val sparkConf = new SparkConf() + val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf) + val step = new VolcanoFeatureStep() + step.init(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + val annotations = configuredPod.pod.getMetadata.getAnnotations + assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup") + } +} From 112660248979c8c7fe2bfe81d4a1473ed263aea4 Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Mon, 7 Feb 2022 20:51:47 +0800 Subject: [PATCH 2/5] update config doc and remove redundant deps --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 1 - dev/deps/spark-deps-hadoop-3-hive-2.3 | 1 - resource-managers/kubernetes/core/pom.xml | 6 ------ .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 8 +++++--- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 11b6fc73eb3d..24d6f874a541 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -260,7 +260,6 @@ tink/1.6.0//tink-1.6.0.jar transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar velocity/1.5//velocity-1.5.jar -volcano-client/5.12.0//volcano-client-5.12.0.jar volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar xercesImpl/2.12.0//xercesImpl-2.12.0.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 907a07a40157..5b6d226430bb 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -246,7 +246,6 @@ tink/1.6.0//tink-1.6.0.jar transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar velocity/1.5//velocity-1.5.jar -volcano-client/5.12.0//volcano-client-5.12.0.jar volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar wildfly-openssl/1.0.7.Final//wildfly-openssl-1.0.7.Final.jar xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 3de399ea701b..400939f0b0a0 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -52,12 +52,6 @@ test-jar - - io.fabric8 - volcano-client - ${kubernetes-client.version} - - io.fabric8 volcano-model-v1beta1 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index ddcaeb0c4b43..9ddb45592a08 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -20,6 +20,7 @@ import java.util.Locale import java.util.concurrent.TimeUnit import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.scheduler.VolcanoFeatureStep import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} import org.apache.spark.internal.config.ConfigBuilder @@ -678,14 +679,15 @@ private[spark] object Config extends Logging { val KUBERNETES_JOB_MIN_MEMORY = ConfigBuilder("spark.kubernetes.job.min.memory") .doc("The minimum memory for running the job, in MiB unless otherwise specified. This only " + - "applicable when you enable `VolcanoFeatureStep` feature step in" + + s"applicable when you enable `${classOf[VolcanoFeatureStep].getName}` feature step in" + " `spark.kubernetes.driver.pod.featureSteps`.") .version("3.3.0") .bytesConf(ByteUnit.MiB) .createWithDefaultString("3g") val KUBERNETES_JOB_MIN_CPU = ConfigBuilder("spark.kubernetes.job.min.cpu") - .doc("The minimum CPU for running the job. This only applicable when you enable " + - "`VolcanoFeatureStep` feature step in `spark.kubernetes.driver.pod.featureSteps`.") + .doc(s"The minimum CPU for running the job. This only applicable when you enable " + + s"`${classOf[VolcanoFeatureStep].getName}` feature step in " + + "`spark.kubernetes.driver.pod.featureSteps`.") .version("3.3.0") .doubleConf .createWithDefault(2.0) From c8d7f5c479561c089496fc07c875b9d2e271b6f8 Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Tue, 8 Feb 2022 14:53:52 +0800 Subject: [PATCH 3/5] add -Pvolcano --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 1 - dev/deps/spark-deps-hadoop-3-hive-2.3 | 1 - project/SparkBuild.scala | 7 ++++ resource-managers/kubernetes/core/pom.xml | 36 +++++++++++++++---- .../org/apache/spark/deploy/k8s/Config.scala | 17 --------- .../VolcanoFeatureStep.scala | 16 +-------- .../spark/deploy/k8s/PodBuilderSuite.scala | 13 ------- .../VolcanoFeatureStepSuite.scala | 5 +-- 8 files changed, 39 insertions(+), 57 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/{scheduler => volcano}/VolcanoFeatureStep.scala (79%) rename resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/{scheduler => volcano}/VolcanoFeatureStepSuite.scala (88%) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 24d6f874a541..828423790476 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -260,7 +260,6 @@ tink/1.6.0//tink-1.6.0.jar transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar velocity/1.5//velocity-1.5.jar -volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar xercesImpl/2.12.0//xercesImpl-2.12.0.jar xml-apis/1.4.01//xml-apis-1.4.01.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 5b6d226430bb..f1692777dadb 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -246,7 +246,6 @@ tink/1.6.0//tink-1.6.0.jar transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar velocity/1.5//velocity-1.5.jar -volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar wildfly-openssl/1.0.7.Final//wildfly-openssl-1.0.7.Final.jar xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar xz/1.8//xz-1.8.jar diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3d3a65f3d233..97f9d3101b7a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -421,6 +421,13 @@ object SparkBuild extends PomBuild { // SPARK-14738 - Remove docker tests from main Spark build // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) + if (!profiles.contains("volcano")) { + enable(Seq( + Compile / unmanagedSources / excludeFilter := HiddenFileFilter || "VolcanoFeatureStep.scala", + Test / unmanagedSources / excludeFilter := HiddenFileFilter || "VolcanoFeatureStepSuite.scala" + ))(kubernetes) + } + enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) enable(YARN.settings)(yarn) diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 400939f0b0a0..96337697ac9d 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -29,8 +29,25 @@ Spark Project Kubernetes kubernetes + **/volcano/*.scala + + + volcano + + + + + + io.fabric8 + volcano-model-v1beta1 + ${kubernetes-client.version} + + + + + org.apache.spark @@ -52,12 +69,6 @@ test-jar - - io.fabric8 - volcano-model-v1beta1 - ${kubernetes-client.version} - - io.fabric8 kubernetes-client @@ -109,6 +120,19 @@ + + + + net.alchim31.maven + scala-maven-plugin + + + ${volcano.exclude} + + + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9ddb45592a08..65a8f8269966 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -20,11 +20,9 @@ import java.util.Locale import java.util.concurrent.TimeUnit import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.scheduler.VolcanoFeatureStep import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} import org.apache.spark.internal.config.ConfigBuilder -import org.apache.spark.network.util.ByteUnit private[spark] object Config extends Logging { @@ -677,21 +675,6 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) - val KUBERNETES_JOB_MIN_MEMORY = ConfigBuilder("spark.kubernetes.job.min.memory") - .doc("The minimum memory for running the job, in MiB unless otherwise specified. This only " + - s"applicable when you enable `${classOf[VolcanoFeatureStep].getName}` feature step in" + - " `spark.kubernetes.driver.pod.featureSteps`.") - .version("3.3.0") - .bytesConf(ByteUnit.MiB) - .createWithDefaultString("3g") - val KUBERNETES_JOB_MIN_CPU = ConfigBuilder("spark.kubernetes.job.min.cpu") - .doc(s"The minimum CPU for running the job. This only applicable when you enable " + - s"`${classOf[VolcanoFeatureStep].getName}` feature step in " + - "`spark.kubernetes.driver.pod.featureSteps`.") - .version("3.3.0") - .doubleConf - .createWithDefault(2.0) - val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStep.scala similarity index 79% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStep.scala index b621647aa4dc..ef1cc5748171 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStep.scala @@ -14,15 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.features.scheduler - -import scala.collection.JavaConverters._ +package org.apache.spark.deploy.k8s.features.volcano import io.fabric8.kubernetes.api.model._ import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep} private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep @@ -33,8 +30,6 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name" private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup" - private lazy val minCpu = kubernetesConf.get(KUBERNETES_JOB_MIN_CPU) - private lazy val minMemory = kubernetesConf.get(KUBERNETES_JOB_MIN_MEMORY) private lazy val namespace = kubernetesConf.namespace override def init(config: KubernetesDriverConf): Unit = { @@ -46,20 +41,11 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon } override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = { - val cpuQ = new QuantityBuilder(false) - .withAmount(s"${minCpu}") - .build() - val memoryQ = new QuantityBuilder(false) - .withAmount(s"${minMemory}Mi") - .build() val podGroup = new PodGroupBuilder() .editOrNewMetadata() .withName(podGroupName) .withNamespace(namespace) .endMetadata() - .editOrNewSpec() - .withMinResources(Map("cpu" -> cpuQ, "memory" -> memoryQ).asJava) - .endSpec() .build() Seq(podGroup) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index 5d65fd73031f..c076f22c7b14 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -124,19 +124,6 @@ abstract class PodBuilderSuite extends SparkFunSuite { assert(e.getMessage.contains("unknow.class")) } - test("SPARK-36061: configure volcano feature step") { - val client = mockKubernetesClient() - val sparkConf = baseConf.clone() - .set(userFeatureStepsConf.key, - "org.apache.spark.deploy.k8s.features.scheduler.VolcanoFeatureStep") - .set(templateFileConf.key, "template-file.yaml") - val pod = buildPod(sparkConf, client) - verifyPod(pod) - val annotation = pod.pod.getMetadata.getAnnotations - assert(annotation.get("scheduling.k8s.io/group-name") === - s"${KubernetesTestConf.APP_ID}-podgroup") - } - test("complain about misconfigured pod template") { val client = mockKubernetesClient( new PodBuilder() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStepSuite.scala similarity index 88% rename from resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStepSuite.scala index dbe3e719885e..d172c570e9e1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/scheduler/VolcanoFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStepSuite.scala @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.features.scheduler +package org.apache.spark.deploy.k8s.features.volcano -import io.fabric8.kubernetes.api.model.Quantity import io.fabric8.volcano.scheduling.v1beta1.PodGroup import org.apache.spark.{SparkConf, SparkFunSuite} @@ -36,8 +35,6 @@ class VolcanoFeatureStepSuite extends SparkFunSuite { assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup") val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup] assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup") - assert(podGroup.getSpec.getMinResources.get("cpu") === new Quantity("2.0")) - assert(podGroup.getSpec.getMinResources.get("memory") === new Quantity("3072")) } test("SPARK-36061: Executor Pod with Volcano PodGroup") { From 1135dbe29534e032c9b648b9465436cee6b97479 Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Thu, 10 Feb 2022 14:34:02 +0800 Subject: [PATCH 4/5] Add Volcano Kuberentes integration test --- .github/workflows/build_and_test.yml | 4 +- project/SparkBuild.scala | 3 + .../kubernetes/integration-tests/pom.xml | 27 ++++++++ .../k8s/integrationtest/KubernetesSuite.scala | 7 ++- .../k8s/integrationtest/VolcanoSuite.scala | 27 ++++++++ .../integrationtest/VolcanoTestsSuite.scala | 63 +++++++++++++++++++ 6 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 4529cd9ba4c2..9edf5efd356b 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -614,7 +614,7 @@ jobs: export MAVEN_CLI_OPTS="--no-transfer-progress" export JAVA_VERSION=${{ matrix.java }} # It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414. - ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install + ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install rm -rf ~/.m2/repository/org/apache/spark scala-213: @@ -660,7 +660,7 @@ jobs: - name: Build with SBT run: | ./dev/change-scala-version.sh 2.13 - ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile + ./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile tpcds-1g: needs: [configure-jobs, precondition] diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 97f9d3101b7a..3374750b7934 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -426,6 +426,9 @@ object SparkBuild extends PomBuild { Compile / unmanagedSources / excludeFilter := HiddenFileFilter || "VolcanoFeatureStep.scala", Test / unmanagedSources / excludeFilter := HiddenFileFilter || "VolcanoFeatureStepSuite.scala" ))(kubernetes) + enable(Seq( + Test / unmanagedSources / excludeFilter := HiddenFileFilter || "Volcano*.scala" + ))(kubernetesIntegrationTests) } enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index a44cedb9e1e2..e6a93b9cb6ae 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -47,6 +47,7 @@ + **/Volcano*.scala jar Spark Project Kubernetes Integration Tests @@ -77,6 +78,19 @@ + + + + net.alchim31.maven + scala-maven-plugin + + + ${volcano.exclude} + + + + + org.codehaus.mojo @@ -209,5 +223,18 @@ + + volcano + + + + + + io.fabric8 + volcano-client + ${kubernetes-client.version} + + + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index c1237e3eb9df..c43080928772 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -181,7 +181,7 @@ class KubernetesSuite extends SparkFunSuite } } - before { + protected def setUpTest(): Unit = { appLocator = UUID.randomUUID().toString.replaceAll("-", "") driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "") sparkAppConf = kubernetesTestComponents.newSparkAppConf() @@ -195,6 +195,10 @@ class KubernetesSuite extends SparkFunSuite } } + before { + setUpTest() + } + after { if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { kubernetesTestComponents.deleteNamespace() @@ -598,6 +602,7 @@ private[spark] object KubernetesSuite { val localTestTag = Tag("local") val rTestTag = Tag("r") val MinikubeTag = Tag("minikube") + val volcanoTag = Tag("volcano") val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest" val SPARK_MINI_READ_WRITE_TEST = "org.apache.spark.examples.MiniReadWriteTest" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala new file mode 100644 index 000000000000..11921abfe227 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite { + + override protected def setUpTest(): Unit = { + super.setUpTest() + sparkAppConf + .set("spark.kubernetes.driver.scheduler.name", "volcano") + .set("spark.kubernetes.executor.scheduler.name", "volcano") + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala new file mode 100644 index 000000000000..440283e28196 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.volcano.client.VolcanoClient + +import org.apache.spark.SparkFunSuite + + +private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite => + import VolcanoTestsSuite._ + import KubernetesSuite.volcanoTag + + protected def checkAnnotaion(pod: Pod): Unit = { + val appId = pod.getMetadata.getLabels.get("spark-app-selector") + val annotations = pod.getMetadata.getAnnotations + assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup") + } + + protected def checkPodGroup(pod: Pod): Unit = { + val appId = pod.getMetadata.getLabels.get("spark-app-selector") + val podGroupName = s"$appId-podgroup" + val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient]) + val podGroup = volcanoClient.podGroups().withName(podGroupName).get() + assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName) + } + + test("Run SparkPi with volcano scheduler", volcanoTag) { + sparkAppConf + .set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP) + .set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP) + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkAnnotaion(driverPod) + checkPodGroup(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkAnnotaion(executorPod) + } + ) + } +} + +private[spark] object VolcanoTestsSuite extends SparkFunSuite { + val VOLCANO_FEATURE_STEP = "org.apache.spark.deploy.k8s.features.volcano.VolcanoFeatureStep" +} From 503b4ed98095aa06691c4991d7918fb90934c94c Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Fri, 11 Feb 2022 11:24:32 +0800 Subject: [PATCH 5/5] Adress comments --- dev/scalastyle | 2 +- project/SparkBuild.scala | 16 +++++++++------- resource-managers/kubernetes/core/pom.xml | 2 +- .../{volcano => }/VolcanoFeatureStep.scala | 3 +-- .../{volcano => }/VolcanoFeatureStepSuite.scala | 2 +- .../kubernetes/integration-tests/pom.xml | 8 +++++++- .../k8s/integrationtest/KubernetesSuite.scala | 1 - .../k8s/integrationtest/VolcanoSuite.scala | 6 ++++++ .../k8s/integrationtest/VolcanoTestsSuite.scala | 15 +++++++++++---- 9 files changed, 37 insertions(+), 18 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/{volcano => }/VolcanoFeatureStep.scala (92%) rename resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/{volcano => }/VolcanoFeatureStepSuite.scala (97%) diff --git a/dev/scalastyle b/dev/scalastyle index 212ef900eb9b..5f958b8fb0a7 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,7 +17,7 @@ # limitations under the License. # -SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"} +SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano"} # NOTE: echo "q" is needed because SBT prompts the user for input on encountering a build file # with failure (either resolution or compilation); the "q" makes SBT quit. diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3374750b7934..1b8b258af277 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -422,13 +422,8 @@ object SparkBuild extends PomBuild { // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) if (!profiles.contains("volcano")) { - enable(Seq( - Compile / unmanagedSources / excludeFilter := HiddenFileFilter || "VolcanoFeatureStep.scala", - Test / unmanagedSources / excludeFilter := HiddenFileFilter || "VolcanoFeatureStepSuite.scala" - ))(kubernetes) - enable(Seq( - Test / unmanagedSources / excludeFilter := HiddenFileFilter || "Volcano*.scala" - ))(kubernetesIntegrationTests) + enable(Volcano.settings)(kubernetes) + enable(Volcano.settings)(kubernetesIntegrationTests) } enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) @@ -966,6 +961,13 @@ object SparkR { ) } +object Volcano { + // Exclude all volcano file for Compile and Test + lazy val settings = Seq( + unmanagedSources / excludeFilter := HiddenFileFilter || "*Volcano*.scala" + ) +} + object Unidoc { import BuildCommons._ diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 96337697ac9d..6eb357ef2490 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes - **/volcano/*.scala + **/*Volcano*.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala similarity index 92% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStep.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala index ef1cc5748171..1c936848db67 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.features.volcano +package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model._ import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod} -import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep} private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep with KubernetesExecutorCustomFeatureConfigStep { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala similarity index 97% rename from resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStepSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala index d172c570e9e1..cf337f99cab9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/volcano/VolcanoFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.features.volcano +package org.apache.spark.deploy.k8s.features import io.fabric8.volcano.scheduling.v1beta1.PodGroup diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index e6a93b9cb6ae..0bc8508cbf86 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -47,7 +47,7 @@ - **/Volcano*.scala + **/*Volcano*.scala jar Spark Project Kubernetes Integration Tests @@ -75,6 +75,12 @@ spark-tags_${scala.binary.version} test-jar + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + test + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index c43080928772..69b736951301 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -602,7 +602,6 @@ private[spark] object KubernetesSuite { val localTestTag = Tag("local") val rTestTag = Tag("r") val MinikubeTag = Tag("minikube") - val volcanoTag = Tag("volcano") val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest" val SPARK_MINI_READ_WRITE_TEST = "org.apache.spark.examples.MiniReadWriteTest" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala index 11921abfe227..ed7371718f9a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import org.scalatest.Tag + class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite { override protected def setUpTest(): Unit = { @@ -25,3 +27,7 @@ class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite { .set("spark.kubernetes.executor.scheduler.name", "volcano") } } + +private[spark] object VolcanoSuite { + val volcanoTag = Tag("volcano") +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala index 440283e28196..377a1b816798 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala @@ -20,11 +20,16 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.volcano.client.VolcanoClient import org.apache.spark.SparkFunSuite - +import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag +import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite => import VolcanoTestsSuite._ - import KubernetesSuite.volcanoTag + + protected def checkScheduler(pod: Pod): Unit = { + assert(pod.getSpec.getSchedulerName === "volcano") + } protected def checkAnnotaion(pod: Pod): Unit = { val appId = pod.getMetadata.getLabels.get("spark-app-selector") @@ -40,18 +45,20 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite => assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName) } - test("Run SparkPi with volcano scheduler", volcanoTag) { + test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) { sparkAppConf .set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP) .set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP) runSparkPiAndVerifyCompletion( driverPodChecker = (driverPod: Pod) => { doBasicDriverPodCheck(driverPod) + checkScheduler(driverPod) checkAnnotaion(driverPod) checkPodGroup(driverPod) }, executorPodChecker = (executorPod: Pod) => { doBasicExecutorPodCheck(executorPod) + checkScheduler(executorPod) checkAnnotaion(executorPod) } ) @@ -59,5 +66,5 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite => } private[spark] object VolcanoTestsSuite extends SparkFunSuite { - val VOLCANO_FEATURE_STEP = "org.apache.spark.deploy.k8s.features.volcano.VolcanoFeatureStep" + val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName }