diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index b9355c3a709d7..7cb90d8d20ccf 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1420,7 +1420,9 @@ See the [configuration page](configuration.html) for information on Spark config
Class names of an extra driver pod feature step implementing
`KubernetesFeatureConfigStep`. This is a developer API. Comma separated.
- Runs after all of Spark internal feature steps.
+ Runs after all of Spark internal feature steps. Since 3.3.0, your driver feature step
+ can implement `KubernetesDriverCustomFeatureConfigStep` where the driver config
+ is also available.
|
3.2.0 |
@@ -1430,7 +1432,9 @@ See the [configuration page](configuration.html) for information on Spark config
Class names of an extra executor pod feature step implementing
`KubernetesFeatureConfigStep`. This is a developer API. Comma separated.
- Runs after all of Spark internal feature steps.
+ Runs after all of Spark internal feature steps. Since 3.3.0, your executor feature step
+ can implement `KubernetesExecutorCustomFeatureConfigStep` where the executor config
+ is also available.
|
3.2.0 |
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index a2ad0d0a52a7f..5542794599a62 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -341,7 +341,9 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.driver.pod.featureSteps")
.doc("Class names of an extra driver pod feature step implementing " +
"KubernetesFeatureConfigStep. This is a developer API. Comma separated. " +
- "Runs after all of Spark internal feature steps.")
+ "Runs after all of Spark internal feature steps. Since 3.3.0, your driver feature " +
+ "step can implement `KubernetesDriverCustomFeatureConfigStep` where the driver " +
+ "config is also available.")
.version("3.2.0")
.stringConf
.toSequence
@@ -351,7 +353,9 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.executor.pod.featureSteps")
.doc("Class name of an extra executor pod feature step implementing " +
"KubernetesFeatureConfigStep. This is a developer API. Comma separated. " +
- "Runs after all of Spark internal feature steps.")
+ "Runs after all of Spark internal feature steps. Since 3.3.0, your executor feature " +
+ "step can implement `KubernetesExecutorCustomFeatureConfigStep` where the executor " +
+ "config is also available.")
.version("3.2.0")
.stringConf
.toSequence
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesDriverCustomFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesDriverCustomFeatureConfigStep.scala
new file mode 100644
index 0000000000000..bbd05e9f67c51
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesDriverCustomFeatureConfigStep.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import org.apache.spark.annotation.{DeveloperApi, Unstable}
+import org.apache.spark.deploy.k8s.KubernetesDriverConf
+
+/**
+ * :: DeveloperApi ::
+ *
+ * A base interface to help user extend custom feature step in driver side.
+ * Note: If your custom feature step would be used only in driver or both in driver and executor,
+ * please use this.
+ */
+@Unstable
+@DeveloperApi
+trait KubernetesDriverCustomFeatureConfigStep extends KubernetesFeatureConfigStep {
+ /**
+ * Initialize the configuration for driver user feature step, this only applicable when user
+ * specified `spark.kubernetes.driver.pod.featureSteps`, the init will be called after feature
+ * step loading.
+ */
+ def init(config: KubernetesDriverConf): Unit
+}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesExecutorCustomFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesExecutorCustomFeatureConfigStep.scala
new file mode 100644
index 0000000000000..062fa7dbf1413
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesExecutorCustomFeatureConfigStep.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import org.apache.spark.annotation.{DeveloperApi, Unstable}
+import org.apache.spark.deploy.k8s.KubernetesExecutorConf
+
+/**
+ * :: DeveloperApi ::
+ *
+ * A base interface to help user extend custom feature step in executor side.
+ * Note: If your custom feature step would be used only in driver or both in driver and executor,
+ * please use this.
+ */
+@Unstable
+@DeveloperApi
+trait KubernetesExecutorCustomFeatureConfigStep extends KubernetesFeatureConfigStep {
+ /**
+ * Initialize the configuration for executor user feature step, this only applicable when user
+ * specified `spark.kubernetes.executor.pod.featureSteps` the init will be called after feature
+ * step loading.
+ */
+ def init(config: KubernetesExecutorConf): Unit
+}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index f0c78f371d6d2..e89e52f1af201 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.util.Utils
@@ -39,7 +40,26 @@ private[spark] class KubernetesDriverBuilder {
val userFeatures = conf.get(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS)
.map { className =>
- Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
+ val feature = Utils.classForName[Any](className).newInstance()
+ val initializedFeature = feature match {
+ // Since 3.3, allow user to implement feature with KubernetesDriverConf
+ case d: KubernetesDriverCustomFeatureConfigStep =>
+ d.init(conf)
+ Some(d)
+ // raise SparkException with wrong type feature step
+ case _: KubernetesExecutorCustomFeatureConfigStep =>
+ None
+ // Since 3.2, allow user to implement feature without config
+ case f: KubernetesFeatureConfigStep =>
+ Some(f)
+ case _ => None
+ }
+ initializedFeature.getOrElse {
+ throw new SparkException(s"Failed to initialize feature step: $className, " +
+ s"please make sure your driver side feature steps are implemented by " +
+ s"`${classOf[KubernetesDriverCustomFeatureConfigStep].getName}` or " +
+ s"`${classOf[KubernetesFeatureConfigStep].getName}`.")
+ }
}
val features = Seq(
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 1a62d08a7b413..1f6d72cb7eee0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.client.KubernetesClient
-import org.apache.spark.SecurityManager
+import org.apache.spark.{SecurityManager, SparkException}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.resource.ResourceProfile
@@ -43,7 +43,26 @@ private[spark] class KubernetesExecutorBuilder {
val userFeatures = conf.get(Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS)
.map { className =>
- Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
+ val feature = Utils.classForName[Any](className).newInstance()
+ val initializedFeature = feature match {
+ // Since 3.3, allow user to implement feature with KubernetesExecutorConf
+ case e: KubernetesExecutorCustomFeatureConfigStep =>
+ e.init(conf)
+ Some(e)
+ // raise SparkException with wrong type feature step
+ case _: KubernetesDriverCustomFeatureConfigStep =>
+ None
+ // Since 3.2, allow user to implement feature without config
+ case f: KubernetesFeatureConfigStep =>
+ Some(f)
+ case _ => None
+ }
+ initializedFeature.getOrElse {
+ throw new SparkException(s"Failed to initialize feature step: $className, " +
+ s"please make sure your executor side feature steps are implemented by " +
+ s"`${classOf[KubernetesExecutorCustomFeatureConfigStep].getSimpleName}` or " +
+ s"`${classOf[KubernetesFeatureConfigStep].getSimpleName}`.")
+ }
}
val features = Seq(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
index a8a3ca4eea965..c076f22c7b141 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
@@ -26,15 +26,22 @@ import org.mockito.Mockito.{mock, never, verify, when}
import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep, KubernetesFeatureConfigStep}
import org.apache.spark.internal.config.ConfigEntry
abstract class PodBuilderSuite extends SparkFunSuite {
+ val POD_ROLE: String
+ val TEST_ANNOTATION_KEY: String
+ val TEST_ANNOTATION_VALUE: String
protected def templateFileConf: ConfigEntry[_]
protected def userFeatureStepsConf: ConfigEntry[_]
+ protected def userFeatureStepWithExpectedAnnotation: (String, String)
+
+ protected def wrongTypeFeatureStep: String
+
protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod
protected val baseConf = new SparkConf(false)
@@ -66,6 +73,57 @@ abstract class PodBuilderSuite extends SparkFunSuite {
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two"))
}
+ test("SPARK-37145: configure a custom test step with base config") {
+ val client = mockKubernetesClient()
+ val sparkConf = baseConf.clone()
+ .set(userFeatureStepsConf.key,
+ "org.apache.spark.deploy.k8s.TestStepWithConf")
+ .set(templateFileConf.key, "template-file.yaml")
+ .set("test-features-key", "test-features-value")
+ val pod = buildPod(sparkConf, client)
+ verifyPod(pod)
+ val metadata = pod.pod.getMetadata
+ assert(metadata.getAnnotations.containsKey("test-features-key"))
+ assert(metadata.getAnnotations.get("test-features-key") === "test-features-value")
+ }
+
+ test("SPARK-37145: configure a custom test step with driver or executor config") {
+ val client = mockKubernetesClient()
+ val (featureSteps, annotation) = userFeatureStepWithExpectedAnnotation
+ val sparkConf = baseConf.clone()
+ .set(templateFileConf.key, "template-file.yaml")
+ .set(userFeatureStepsConf.key, featureSteps)
+ .set(TEST_ANNOTATION_KEY, annotation)
+ val pod = buildPod(sparkConf, client)
+ verifyPod(pod)
+ val metadata = pod.pod.getMetadata
+ assert(metadata.getAnnotations.containsKey(TEST_ANNOTATION_KEY))
+ assert(metadata.getAnnotations.get(TEST_ANNOTATION_KEY) === annotation)
+ }
+
+ test("SPARK-37145: configure a custom test step with wrong type config") {
+ val client = mockKubernetesClient()
+ val sparkConf = baseConf.clone()
+ .set(templateFileConf.key, "template-file.yaml")
+ .set(userFeatureStepsConf.key, wrongTypeFeatureStep)
+ val e = intercept[SparkException] {
+ buildPod(sparkConf, client)
+ }
+ assert(e.getMessage.contains(s"please make sure your $POD_ROLE side feature steps"))
+ }
+
+ test("SPARK-37145: configure a custom test step with wrong name") {
+ val client = mockKubernetesClient()
+ val featureSteps = "unknow.class"
+ val sparkConf = baseConf.clone()
+ .set(templateFileConf.key, "template-file.yaml")
+ .set(userFeatureStepsConf.key, featureSteps)
+ val e = intercept[ClassNotFoundException] {
+ buildPod(sparkConf, client)
+ }
+ assert(e.getMessage.contains("unknow.class"))
+ }
+
test("complain about misconfigured pod template") {
val client = mockKubernetesClient(
new PodBuilder()
@@ -249,3 +307,30 @@ class TestStepTwo extends KubernetesFeatureConfigStep {
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}
}
+
+/**
+ * A test user feature step would be used in driver and executor.
+ */
+class TestStepWithConf extends KubernetesDriverCustomFeatureConfigStep
+ with KubernetesExecutorCustomFeatureConfigStep {
+ import io.fabric8.kubernetes.api.model._
+
+ private var kubernetesConf: KubernetesConf = _
+
+ override def init(conf: KubernetesDriverConf): Unit = {
+ kubernetesConf = conf
+ }
+
+ override def init(conf: KubernetesExecutorConf): Unit = {
+ kubernetesConf = conf
+ }
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val k8sPodBuilder = new PodBuilder(pod.pod)
+ .editOrNewMetadata()
+ .addToAnnotations("test-features-key", kubernetesConf.get("test-features-key"))
+ .endMetadata()
+ val k8sPod = k8sPodBuilder.build()
+ SparkPod(k8sPod, pod.container)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 8bf43d909dee3..5389a880d1b4a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -22,10 +22,13 @@ import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
-import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesFeatureConfigStep}
import org.apache.spark.internal.config.ConfigEntry
class KubernetesDriverBuilderSuite extends PodBuilderSuite {
+ val POD_ROLE: String = "driver"
+ val TEST_ANNOTATION_KEY: String = "driver-annotation-key"
+ val TEST_ANNOTATION_VALUE: String = "driver-annotation-value"
override protected def templateFileConf: ConfigEntry[_] = {
Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
@@ -35,6 +38,14 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS
}
+ override protected def userFeatureStepWithExpectedAnnotation: (String, String) = {
+ ("org.apache.spark.deploy.k8s.submit.TestStepWithDrvConf", TEST_ANNOTATION_VALUE)
+ }
+
+ override protected def wrongTypeFeatureStep: String = {
+ "org.apache.spark.scheduler.cluster.k8s.TestStepWithExecConf"
+ }
+
override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
@@ -82,3 +93,27 @@ class TestStep extends KubernetesFeatureConfigStep {
.build()
)
}
+
+
+/**
+ * A test driver user feature step would be used in only driver.
+ */
+class TestStepWithDrvConf extends KubernetesDriverCustomFeatureConfigStep {
+ import io.fabric8.kubernetes.api.model._
+
+ private var driverConf: KubernetesDriverConf = _
+
+ override def init(config: KubernetesDriverConf): Unit = {
+ driverConf = config
+ }
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val k8sPodBuilder = new PodBuilder(pod.pod)
+ .editOrNewMetadata()
+ // The annotation key = TEST_ANNOTATION_KEY, value = TEST_ANNOTATION_VALUE
+ .addToAnnotations("driver-annotation-key", driverConf.get("driver-annotation-key"))
+ .endMetadata()
+ val k8sPod = k8sPodBuilder.build()
+ SparkPod(k8sPod, pod.container)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index ec60c6fc0bf82..adbb5b296c9dc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -20,10 +20,14 @@ import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.features.KubernetesExecutorCustomFeatureConfigStep
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile
class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
+ val POD_ROLE: String = "executor"
+ val TEST_ANNOTATION_KEY: String = "executor-annotation-key"
+ val TEST_ANNOTATION_VALUE: String = "executor-annotation-value"
override protected def templateFileConf: ConfigEntry[_] = {
Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE
@@ -33,6 +37,14 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS
}
+ override protected def userFeatureStepWithExpectedAnnotation: (String, String) = {
+ ("org.apache.spark.scheduler.cluster.k8s.TestStepWithExecConf", TEST_ANNOTATION_VALUE)
+ }
+
+ override protected def wrongTypeFeatureStep: String = {
+ "org.apache.spark.deploy.k8s.submit.TestStepWithDrvConf"
+ }
+
override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
sparkConf.set("spark.driver.host", "https://driver.host.com")
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
@@ -40,5 +52,27 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, defaultProfile).pod
}
+}
+/**
+ * A test executor user feature step would be used in only executor.
+ */
+class TestStepWithExecConf extends KubernetesExecutorCustomFeatureConfigStep {
+ import io.fabric8.kubernetes.api.model._
+
+ private var executorConf: KubernetesExecutorConf = _
+
+ def init(config: KubernetesExecutorConf): Unit = {
+ executorConf = config
+ }
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val k8sPodBuilder = new PodBuilder(pod.pod)
+ .editOrNewMetadata()
+ // The annotation key = TEST_ANNOTATION_KEY, value = TEST_ANNOTATION_VALUE
+ .addToAnnotations("executor-annotation-key", executorConf.get("executor-annotation-key"))
+ .endMetadata()
+ val k8sPod = k8sPodBuilder.build()
+ SparkPod(k8sPod, pod.container)
+ }
}