diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c28d6fd405ae1..40609aef1e9d8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -219,6 +219,26 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_DRIVER_POD_FEATURE_STEPS = + ConfigBuilder("spark.kubernetes.driver.pod.featureSteps") + .doc("Class names of an extra driver pod feature step implementing " + + "KubernetesFeatureConfigStep. This is a developer API. Comma separated. " + + "Runs after all of Spark internal feature steps.") + .version("3.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + + val KUBERNETES_EXECUTOR_POD_FEATURE_STEPS = + ConfigBuilder("spark.kubernetes.executor.pod.featureSteps") + .doc("Class name of an extra executor pod feature step implementing " + + "KubernetesFeatureConfigStep. This is a developer API. Comma separated. " + + "Runs after all of Spark internal feature steps.") + .version("3.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala index fd1196368a7ff..c2298e7ca77c6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala @@ -18,7 +18,16 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} -private[spark] case class SparkPod(pod: Pod, container: Container) { +import org.apache.spark.annotation.{DeveloperApi, Unstable} + +/** + * :: DeveloperApi :: + * + * Represents a SparkPod consisting of pod and the container within the pod. + */ +@Unstable +@DeveloperApi +case class SparkPod(pod: Pod, container: Container) { /** * Convenience method to apply a series of chained transformations to a pod. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 58cdaa3cadd6b..3fec92644b956 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -18,13 +18,18 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.HasMetadata +import org.apache.spark.annotation.{DeveloperApi, Unstable} import org.apache.spark.deploy.k8s.SparkPod /** + * :: DeveloperApi :: + * * A collection of functions that together represent a "feature" in pods that are launched for * Spark drivers and executors. */ -private[spark] trait KubernetesFeatureConfigStep { +@Unstable +@DeveloperApi +trait KubernetesFeatureConfigStep { /** * Apply modifications on the given pod in accordance to this feature. This can include attaching diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 43639a3b7dc1b..3b38dd6e4feef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ +import org.apache.spark.util.Utils private[spark] class KubernetesDriverBuilder { @@ -37,6 +38,11 @@ private[spark] class KubernetesDriverBuilder { } .getOrElse(SparkPod.initialPod()) + val userFeatures = conf.get(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS) + .map { className => + Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep] + } + val features = Seq( new BasicDriverFeatureStep(conf), new DriverKubernetesCredentialsFeatureStep(conf), @@ -48,7 +54,7 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) + new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesDriverSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 5388d185489f2..43328c72a6fdd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -24,6 +24,7 @@ import org.apache.spark.SecurityManager import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.resource.ResourceProfile +import org.apache.spark.util.Utils private[spark] class KubernetesExecutorBuilder { @@ -41,13 +42,18 @@ private[spark] class KubernetesExecutorBuilder { } .getOrElse(SparkPod.initialPod()) + val userFeatures = conf.get(Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS) + .map { className => + Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep] + } + val features = Seq( new BasicExecutorFeatureStep(conf, secMgr, resourceProfile), new ExecutorKubernetesCredentialsFeatureStep(conf), new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), - new LocalDirsFeatureStep(conf)) + new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesExecutorSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index 4d4c4baeb12c0..21a5b7a6486fd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -26,12 +26,15 @@ import org.mockito.Mockito.{mock, never, verify, when} import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep import org.apache.spark.internal.config.ConfigEntry abstract class PodBuilderSuite extends SparkFunSuite { protected def templateFileConf: ConfigEntry[_] + protected def userFeatureStepsConf: ConfigEntry[_] + protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod private val baseConf = new SparkConf(false) @@ -50,6 +53,19 @@ abstract class PodBuilderSuite extends SparkFunSuite { verifyPod(pod) } + test("configure a custom test step") { + val client = mockKubernetesClient() + val sparkConf = baseConf.clone() + .set(userFeatureStepsConf.key, + "org.apache.spark.deploy.k8s.TestStepTwo," + + "org.apache.spark.deploy.k8s.TestStep") + .set(templateFileConf.key, "template-file.yaml") + val pod = buildPod(sparkConf, client) + verifyPod(pod) + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long")) + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two")) + } + test("complain about misconfigured pod template") { val client = mockKubernetesClient( new PodBuilder() @@ -173,3 +189,63 @@ abstract class PodBuilderSuite extends SparkFunSuite { } } + +/** + * A test user feature step. + */ +class TestStep extends KubernetesFeatureConfigStep { + import io.fabric8.kubernetes.api.model._ + + override def configurePod(pod: SparkPod): SparkPod = { + val localDirVolumes = Seq(new VolumeBuilder().withName("so_long").build()) + val localDirVolumeMounts = Seq( + new VolumeMountBuilder().withName("so_long") + .withMountPath("and_thanks_for_all_the_fish") + .build() + ) + + val podWithLocalDirVolumes = new PodBuilder(pod.pod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build() + val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) + .addNewEnv() + .withName("CUSTOM_SPARK_LOCAL_DIRS") + .withValue("fishyfishyfishy") + .endEnv() + .addToVolumeMounts(localDirVolumeMounts: _*) + .build() + SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) + } +} + +/** + * A test user feature step. + */ +class TestStepTwo extends KubernetesFeatureConfigStep { + import io.fabric8.kubernetes.api.model._ + + override def configurePod(pod: SparkPod): SparkPod = { + val localDirVolumes = Seq(new VolumeBuilder().withName("so_long_two").build()) + val localDirVolumeMounts = Seq( + new VolumeMountBuilder().withName("so_long_two") + .withMountPath("and_thanks_for_all_the_fish_eh") + .build() + ) + + val podWithLocalDirVolumes = new PodBuilder(pod.pod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build() + val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) + .addNewEnv() + .withName("CUSTOM_SPARK_LOCAL_DIRS_TWO") + .withValue("fishyfishyfishyTWO") + .endEnv() + .addToVolumeMounts(localDirVolumeMounts: _*) + .build() + SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 6518c91a1a1fd..f9802ff967f82 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -28,9 +28,12 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite { Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE } + override protected def userFeatureStepsConf: ConfigEntry[_] = { + Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS + } + override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = { val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod } - } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index c64b733102dc8..ec60c6fc0bf82 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -29,6 +29,10 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite { Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE } + override protected def userFeatureStepsConf: ConfigEntry[_] = { + Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS + } + override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = { sparkConf.set("spark.driver.host", "https://driver.host.com") val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)