Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td>
Class names of an extra driver pod feature step implementing
`KubernetesFeatureConfigStep`. This is a developer API. Comma separated.
Runs after all of Spark internal feature steps.
Runs after all of Spark internal feature steps. Since 3.3.0, your driver feature step
can implement `KubernetesDriverCustomFeatureConfigStep` where the driver config
is also available.
</td>
<td>3.2.0</td>
</tr>
Expand All @@ -1430,7 +1432,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td>
Class names of an extra executor pod feature step implementing
`KubernetesFeatureConfigStep`. This is a developer API. Comma separated.
Runs after all of Spark internal feature steps.
Runs after all of Spark internal feature steps. Since 3.3.0, your executor feature step
can implement `KubernetesExecutorCustomFeatureConfigStep` where the executor config
is also available.
</td>
<td>3.2.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.driver.pod.featureSteps")
.doc("Class names of an extra driver pod feature step implementing " +
"KubernetesFeatureConfigStep. This is a developer API. Comma separated. " +
"Runs after all of Spark internal feature steps.")
"Runs after all of Spark internal feature steps. Since 3.3.0, your driver feature " +
"step can implement `KubernetesDriverCustomFeatureConfigStep` where the driver " +
"config is also available.")
.version("3.2.0")
.stringConf
.toSequence
Expand All @@ -351,7 +353,9 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.executor.pod.featureSteps")
.doc("Class name of an extra executor pod feature step implementing " +
"KubernetesFeatureConfigStep. This is a developer API. Comma separated. " +
"Runs after all of Spark internal feature steps.")
"Runs after all of Spark internal feature steps. Since 3.3.0, your executor feature " +
"step can implement `KubernetesExecutorCustomFeatureConfigStep` where the executor " +
"config is also available.")
.version("3.2.0")
.stringConf
.toSequence
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import org.apache.spark.annotation.{DeveloperApi, Unstable}
import org.apache.spark.deploy.k8s.KubernetesDriverConf

/**
* :: DeveloperApi ::
*
* A base interface to help user extend custom feature step in driver side.
* Note: If your custom feature step would be used only in driver or both in driver and executor,
* please use this.
*/
@Unstable
@DeveloperApi
trait KubernetesDriverCustomFeatureConfigStep extends KubernetesFeatureConfigStep {
/**
* Initialize the configuration for driver user feature step, this only applicable when user
* specified `spark.kubernetes.driver.pod.featureSteps`, the init will be called after feature
* step loading.
*/
def init(config: KubernetesDriverConf): Unit
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import org.apache.spark.annotation.{DeveloperApi, Unstable}
import org.apache.spark.deploy.k8s.KubernetesExecutorConf

/**
* :: DeveloperApi ::
*
* A base interface to help user extend custom feature step in executor side.
* Note: If your custom feature step would be used only in driver or both in driver and executor,
* please use this.
*/
@Unstable
@DeveloperApi
trait KubernetesExecutorCustomFeatureConfigStep extends KubernetesFeatureConfigStep {
/**
* Initialize the configuration for executor user feature step, this only applicable when user
* specified `spark.kubernetes.executor.pod.featureSteps` the init will be called after feature
* step loading.
*/
def init(config: KubernetesExecutorConf): Unit
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.util.Utils
Expand All @@ -39,7 +40,26 @@ private[spark] class KubernetesDriverBuilder {

val userFeatures = conf.get(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS)
.map { className =>
Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
val feature = Utils.classForName[Any](className).newInstance()
val initializedFeature = feature match {
// Since 3.3, allow user to implement feature with KubernetesDriverConf
case d: KubernetesDriverCustomFeatureConfigStep =>
d.init(conf)
Some(d)
// raise SparkException with wrong type feature step
case _: KubernetesExecutorCustomFeatureConfigStep =>
None
// Since 3.2, allow user to implement feature without config
case f: KubernetesFeatureConfigStep =>
Some(f)
case _ => None
}
initializedFeature.getOrElse {
throw new SparkException(s"Failed to initialize feature step: $className, " +
s"please make sure your driver side feature steps are implemented by " +
s"`${classOf[KubernetesDriverCustomFeatureConfigStep].getName}` or " +
s"`${classOf[KubernetesFeatureConfigStep].getName}`.")
}
}

val features = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SparkException}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.resource.ResourceProfile
Expand All @@ -43,7 +43,26 @@ private[spark] class KubernetesExecutorBuilder {

val userFeatures = conf.get(Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS)
.map { className =>
Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
val feature = Utils.classForName[Any](className).newInstance()
val initializedFeature = feature match {
// Since 3.3, allow user to implement feature with KubernetesExecutorConf
case e: KubernetesExecutorCustomFeatureConfigStep =>
e.init(conf)
Some(e)
// raise SparkException with wrong type feature step
case _: KubernetesDriverCustomFeatureConfigStep =>
None
// Since 3.2, allow user to implement feature without config
case f: KubernetesFeatureConfigStep =>
Some(f)
case _ => None
}
initializedFeature.getOrElse {
throw new SparkException(s"Failed to initialize feature step: $className, " +
s"please make sure your executor side feature steps are implemented by " +
s"`${classOf[KubernetesExecutorCustomFeatureConfigStep].getSimpleName}` or " +
s"`${classOf[KubernetesFeatureConfigStep].getSimpleName}`.")
}
}

val features = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,22 @@ import org.mockito.Mockito.{mock, never, verify, when}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep, KubernetesFeatureConfigStep}
import org.apache.spark.internal.config.ConfigEntry

abstract class PodBuilderSuite extends SparkFunSuite {
val POD_ROLE: String
val TEST_ANNOTATION_KEY: String
val TEST_ANNOTATION_VALUE: String

protected def templateFileConf: ConfigEntry[_]

protected def userFeatureStepsConf: ConfigEntry[_]

protected def userFeatureStepWithExpectedAnnotation: (String, String)

protected def wrongTypeFeatureStep: String

protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod

protected val baseConf = new SparkConf(false)
Expand Down Expand Up @@ -66,6 +73,57 @@ abstract class PodBuilderSuite extends SparkFunSuite {
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two"))
}

test("SPARK-37145: configure a custom test step with base config") {
val client = mockKubernetesClient()
val sparkConf = baseConf.clone()
.set(userFeatureStepsConf.key,
"org.apache.spark.deploy.k8s.TestStepWithConf")
.set(templateFileConf.key, "template-file.yaml")
.set("test-features-key", "test-features-value")
val pod = buildPod(sparkConf, client)
verifyPod(pod)
val metadata = pod.pod.getMetadata
assert(metadata.getAnnotations.containsKey("test-features-key"))
assert(metadata.getAnnotations.get("test-features-key") === "test-features-value")
}

test("SPARK-37145: configure a custom test step with driver or executor config") {
val client = mockKubernetesClient()
val (featureSteps, annotation) = userFeatureStepWithExpectedAnnotation
val sparkConf = baseConf.clone()
.set(templateFileConf.key, "template-file.yaml")
.set(userFeatureStepsConf.key, featureSteps)
.set(TEST_ANNOTATION_KEY, annotation)
val pod = buildPod(sparkConf, client)
verifyPod(pod)
val metadata = pod.pod.getMetadata
assert(metadata.getAnnotations.containsKey(TEST_ANNOTATION_KEY))
assert(metadata.getAnnotations.get(TEST_ANNOTATION_KEY) === annotation)
}

test("SPARK-37145: configure a custom test step with wrong type config") {
val client = mockKubernetesClient()
val sparkConf = baseConf.clone()
.set(templateFileConf.key, "template-file.yaml")
.set(userFeatureStepsConf.key, wrongTypeFeatureStep)
val e = intercept[SparkException] {
buildPod(sparkConf, client)
}
assert(e.getMessage.contains(s"please make sure your $POD_ROLE side feature steps"))
}

test("SPARK-37145: configure a custom test step with wrong name") {
val client = mockKubernetesClient()
val featureSteps = "unknow.class"
val sparkConf = baseConf.clone()
.set(templateFileConf.key, "template-file.yaml")
.set(userFeatureStepsConf.key, featureSteps)
val e = intercept[ClassNotFoundException] {
buildPod(sparkConf, client)
}
assert(e.getMessage.contains("unknow.class"))
}

test("complain about misconfigured pod template") {
val client = mockKubernetesClient(
new PodBuilder()
Expand Down Expand Up @@ -249,3 +307,30 @@ class TestStepTwo extends KubernetesFeatureConfigStep {
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}
}

/**
* A test user feature step would be used in driver and executor.
*/
class TestStepWithConf extends KubernetesDriverCustomFeatureConfigStep
with KubernetesExecutorCustomFeatureConfigStep {
import io.fabric8.kubernetes.api.model._

private var kubernetesConf: KubernetesConf = _

override def init(conf: KubernetesDriverConf): Unit = {
kubernetesConf = conf
}

override def init(conf: KubernetesExecutorConf): Unit = {
kubernetesConf = conf
}

override def configurePod(pod: SparkPod): SparkPod = {
val k8sPodBuilder = new PodBuilder(pod.pod)
.editOrNewMetadata()
.addToAnnotations("test-features-key", kubernetesConf.get("test-features-key"))
.endMetadata()
val k8sPod = k8sPodBuilder.build()
SparkPod(k8sPod, pod.container)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesFeatureConfigStep}
import org.apache.spark.internal.config.ConfigEntry

class KubernetesDriverBuilderSuite extends PodBuilderSuite {
val POD_ROLE: String = "driver"
val TEST_ANNOTATION_KEY: String = "driver-annotation-key"
val TEST_ANNOTATION_VALUE: String = "driver-annotation-value"

override protected def templateFileConf: ConfigEntry[_] = {
Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
Expand All @@ -35,6 +38,14 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS
}

override protected def userFeatureStepWithExpectedAnnotation: (String, String) = {
("org.apache.spark.deploy.k8s.submit.TestStepWithDrvConf", TEST_ANNOTATION_VALUE)
}

override protected def wrongTypeFeatureStep: String = {
"org.apache.spark.scheduler.cluster.k8s.TestStepWithExecConf"
}

override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
Expand Down Expand Up @@ -82,3 +93,27 @@ class TestStep extends KubernetesFeatureConfigStep {
.build()
)
}


/**
* A test driver user feature step would be used in only driver.
*/
class TestStepWithDrvConf extends KubernetesDriverCustomFeatureConfigStep {
import io.fabric8.kubernetes.api.model._

private var driverConf: KubernetesDriverConf = _

override def init(config: KubernetesDriverConf): Unit = {
driverConf = config
}

override def configurePod(pod: SparkPod): SparkPod = {
val k8sPodBuilder = new PodBuilder(pod.pod)
.editOrNewMetadata()
// The annotation key = TEST_ANNOTATION_KEY, value = TEST_ANNOTATION_VALUE
.addToAnnotations("driver-annotation-key", driverConf.get("driver-annotation-key"))
.endMetadata()
val k8sPod = k8sPodBuilder.build()
SparkPod(k8sPod, pod.container)
}
}
Loading