diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4467f73e70568..37edc81991ac2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -404,6 +404,14 @@ private[spark] object Config extends Logging { .toSequence .createWithDefault(Nil) + val KUBERNETES_DRIVER_POD_EXCLUDED_FEATURE_STEPS = + ConfigBuilder("spark.kubernetes.driver.pod.excludedFeatureSteps") + .doc("Class names to exclude from driver pod feature steps. Comma separated.") + .version("4.1.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val KUBERNETES_EXECUTOR_POD_FEATURE_STEPS = ConfigBuilder("spark.kubernetes.executor.pod.featureSteps") .doc("Class name of an extra executor pod feature step implementing " + @@ -416,6 +424,14 @@ private[spark] object Config extends Logging { .toSequence .createWithDefault(Nil) + val KUBERNETES_EXECUTOR_POD_EXCLUDED_FEATURE_STEPS = + ConfigBuilder("spark.kubernetes.executor.pod.excludedFeatureSteps") + .doc("Class name to exclude from executor pod feature steps. Comma separated.") + .version("4.1.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL = ConfigBuilder("spark.kubernetes.executor.decommissionLabel") .doc("Label to apply to a pod which is being decommissioned." + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 12626a8676efe..da234762ea1de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -72,7 +72,7 @@ class KubernetesDriverBuilder { } } - val features = Seq( + val allFeatures = Seq( new BasicDriverFeatureStep(conf), new DriverKubernetesCredentialsFeatureStep(conf), new DriverServiceFeatureStep(conf), @@ -85,6 +85,9 @@ class KubernetesDriverBuilder { new PodTemplateConfigMapStep(conf), new LocalDirsFeatureStep(conf)) ++ userFeatures + val features = allFeatures.filterNot(f => + conf.get(Config.KUBERNETES_DRIVER_POD_EXCLUDED_FEATURE_STEPS).contains(f.getClass.getName)) + val spec = KubernetesDriverSpec( initialPod, driverPreKubernetesResources = Seq.empty, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index a85e42662b890..2253c07df116e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -65,7 +65,7 @@ private[spark] class KubernetesExecutorBuilder { } } - val features = Seq( + val allFeatures = Seq( new BasicExecutorFeatureStep(conf, secMgr, resourceProfile), new ExecutorKubernetesCredentialsFeatureStep(conf), new MountSecretsFeatureStep(conf), @@ -74,6 +74,9 @@ private[spark] class KubernetesExecutorBuilder { new HadoopConfExecutorFeatureStep(conf), new LocalDirsFeatureStep(conf)) ++ userFeatures + val features = allFeatures.filterNot(f => + conf.get(Config.KUBERNETES_EXECUTOR_POD_EXCLUDED_FEATURE_STEPS).contains(f.getClass.getName)) + val spec = KubernetesExecutorSpec( initialPod, executorKubernetesResources = Seq.empty) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index 947db5dd41c1a..a803819432378 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -40,6 +40,8 @@ abstract class PodBuilderSuite extends SparkFunSuite { protected def roleSpecificSchedulerNameConf: ConfigEntry[_] + protected def excludedFeatureStepsConf: ConfigEntry[_] + protected def userFeatureStepsConf: ConfigEntry[_] protected def userFeatureStepWithExpectedAnnotation: (String, String) @@ -91,6 +93,21 @@ abstract class PodBuilderSuite extends SparkFunSuite { assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two")) } + test("SPARK-52830: exclude a feature step") { + val client = mockKubernetesClient() + val sparkConf = baseConf.clone() + .set(excludedFeatureStepsConf.key, + "org.apache.spark.deploy.k8s.TestStepTwo") + .set(userFeatureStepsConf.key, + "org.apache.spark.deploy.k8s.TestStepTwo," + + "org.apache.spark.deploy.k8s.TestStep") + .set(templateFileConf.key, "template-file.yaml") + val pod = buildPod(sparkConf, client) + verifyPod(pod) + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long")) + assert(!pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two")) + } + test("SPARK-37145: configure a custom test step with base config") { val client = mockKubernetesClient() val sparkConf = baseConf.clone() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 861b8e0fff943..d3fd5ee5f00c4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -38,6 +38,10 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite { Config.KUBERNETES_DRIVER_SCHEDULER_NAME } + override protected def excludedFeatureStepsConf: ConfigEntry[_] = { + Config.KUBERNETES_DRIVER_POD_EXCLUDED_FEATURE_STEPS + } + override protected def userFeatureStepsConf: ConfigEntry[_] = { Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 17c2d4a938c14..5f0f04da9196b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -42,6 +42,10 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite { Config.KUBERNETES_EXECUTOR_SCHEDULER_NAME } + override protected def excludedFeatureStepsConf: ConfigEntry[_] = { + Config.KUBERNETES_EXECUTOR_POD_EXCLUDED_FEATURE_STEPS + } + override protected def userFeatureStepsConf: ConfigEntry[_] = { Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS }