Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,26 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_POD_FEATURE_STEPS =
ConfigBuilder("spark.kubernetes.driver.pod.featureSteps")
.doc("Class names of an extra driver pod feature step implementing " +
"KubernetesFeatureConfigStep. This is a developer API. Comma separated. " +
"Runs after all of Spark internal feature steps.")
.version("3.2.0")
.stringConf
.toSequence
.createWithDefault(Nil)

val KUBERNETES_EXECUTOR_POD_FEATURE_STEPS =
ConfigBuilder("spark.kubernetes.executor.pod.featureSteps")
.doc("Class name of an extra executor pod feature step implementing " +
"KubernetesFeatureConfigStep. This is a developer API. Comma separated. " +
"Runs after all of Spark internal feature steps.")
.version("3.2.0")
.stringConf
.toSequence
.createWithDefault(Nil)

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@ package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

private[spark] case class SparkPod(pod: Pod, container: Container) {
import org.apache.spark.annotation.{DeveloperApi, Unstable}

/**
* :: DeveloperApi ::
*
* Represents a SparkPod consisting of pod and the container within the pod.
*/
@Unstable
@DeveloperApi
case class SparkPod(pod: Pod, container: Container) {

/**
* Convenience method to apply a series of chained transformations to a pod.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model.HasMetadata

import org.apache.spark.annotation.{DeveloperApi, Unstable}
import org.apache.spark.deploy.k8s.SparkPod

/**
* :: DeveloperApi ::
*
* A collection of functions that together represent a "feature" in pods that are launched for
* Spark drivers and executors.
*/
private[spark] trait KubernetesFeatureConfigStep {
@Unstable
@DeveloperApi
trait KubernetesFeatureConfigStep {

/**
* Apply modifications on the given pod in accordance to this feature. This can include attaching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.util.Utils

private[spark] class KubernetesDriverBuilder {

Expand All @@ -37,6 +38,11 @@ private[spark] class KubernetesDriverBuilder {
}
.getOrElse(SparkPod.initialPod())

val userFeatures = conf.get(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS)
.map { className =>
Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
}

val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
Expand All @@ -48,7 +54,7 @@ private[spark] class KubernetesDriverBuilder {
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf))
new LocalDirsFeatureStep(conf)) ++ userFeatures

val spec = KubernetesDriverSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.SecurityManager
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.util.Utils

private[spark] class KubernetesExecutorBuilder {

Expand All @@ -41,13 +42,18 @@ private[spark] class KubernetesExecutorBuilder {
}
.getOrElse(SparkPod.initialPod())

val userFeatures = conf.get(Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS)
.map { className =>
Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
}

val features = Seq(
new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf))
new LocalDirsFeatureStep(conf)) ++ userFeatures

val spec = KubernetesExecutorSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import org.mockito.Mockito.{mock, never, verify, when}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.internal.config.ConfigEntry

abstract class PodBuilderSuite extends SparkFunSuite {

protected def templateFileConf: ConfigEntry[_]

protected def userFeatureStepsConf: ConfigEntry[_]

protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod

private val baseConf = new SparkConf(false)
Expand All @@ -50,6 +53,19 @@ abstract class PodBuilderSuite extends SparkFunSuite {
verifyPod(pod)
}

test("configure a custom test step") {
val client = mockKubernetesClient()
val sparkConf = baseConf.clone()
.set(userFeatureStepsConf.key,
"org.apache.spark.deploy.k8s.TestStepTwo," +
"org.apache.spark.deploy.k8s.TestStep")
.set(templateFileConf.key, "template-file.yaml")
val pod = buildPod(sparkConf, client)
verifyPod(pod)
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long"))
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two"))
}

test("complain about misconfigured pod template") {
val client = mockKubernetesClient(
new PodBuilder()
Expand Down Expand Up @@ -173,3 +189,63 @@ abstract class PodBuilderSuite extends SparkFunSuite {
}

}

/**
* A test user feature step.
*/
class TestStep extends KubernetesFeatureConfigStep {
import io.fabric8.kubernetes.api.model._

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = Seq(new VolumeBuilder().withName("so_long").build())
val localDirVolumeMounts = Seq(
new VolumeMountBuilder().withName("so_long")
.withMountPath("and_thanks_for_all_the_fish")
.build()
)

val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
.endSpec()
.build()
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv()
.withName("CUSTOM_SPARK_LOCAL_DIRS")
.withValue("fishyfishyfishy")
.endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*)
.build()
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}
}

/**
* A test user feature step.
*/
class TestStepTwo extends KubernetesFeatureConfigStep {
import io.fabric8.kubernetes.api.model._

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = Seq(new VolumeBuilder().withName("so_long_two").build())
val localDirVolumeMounts = Seq(
new VolumeMountBuilder().withName("so_long_two")
.withMountPath("and_thanks_for_all_the_fish_eh")
.build()
)

val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
.endSpec()
.build()
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv()
.withName("CUSTOM_SPARK_LOCAL_DIRS_TWO")
.withValue("fishyfishyfishyTWO")
.endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*)
.build()
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
}

override protected def userFeatureStepsConf: ConfigEntry[_] = {
Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS
}

override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE
}

override protected def userFeatureStepsConf: ConfigEntry[_] = {
Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS
}

override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
sparkConf.set("spark.driver.host", "https://driver.host.com")
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
Expand Down