diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 7930cd0ce156..406c84279f0c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -684,6 +684,15 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) + private [spark] val KUBERNETES_EXECUTOR_POD_ANTI_AFFINITY = + ConfigBuilder("spark.kubernetes.executor.pod.antiAffinity.enable") + .doc("If enable, register executor with anti affinity. This anti affinity will help " + + "Kubernetes assign executors of the same Application to different nodes " + + "as much as possible") + .version("3.2.1") + .booleanConf + .createWithDefault(false) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/AntiAffinityFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/AntiAffinityFeatureStep.scala new file mode 100644 index 000000000000..52136edc2018 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/AntiAffinityFeatureStep.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.PodBuilder + +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_POD_ANTI_AFFINITY +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging + +class AntiAffinityFeatureStep(kubernetesConf: KubernetesExecutorConf) + extends KubernetesFeatureConfigStep with Logging { + + private val enableExecutorPodAntiAffinity = + kubernetesConf.get(KUBERNETES_EXECUTOR_POD_ANTI_AFFINITY) + + override def configurePod(pod: SparkPod): SparkPod = { + if (enableExecutorPodAntiAffinity) { + val podWithAntiAffinity = new PodBuilder(pod.pod) + .editOrNewSpec() + .editOrNewAffinity() + .editOrNewPodAntiAffinity() + .addNewPreferredDuringSchedulingIgnoredDuringExecution() + .withWeight(100) + .editOrNewPodAffinityTerm() + .withTopologyKey("kubernetes.io/hostname") + .editOrNewLabelSelector() + .addNewMatchExpression() + .withKey(SPARK_APP_ID_LABEL) + .withOperator("In") + .withValues(kubernetesConf.appId) + .endMatchExpression() + .endLabelSelector() + .endPodAffinityTerm() + .endPreferredDuringSchedulingIgnoredDuringExecution() + .endPodAntiAffinity() + .endAffinity() + .endSpec() + .build() + SparkPod(podWithAntiAffinity, pod.container) + } else { + pod + } + } +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 1f6d72cb7eee..9e7bfad0aedb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -71,7 +71,8 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new AntiAffinityFeatureStep(conf)) ++ userFeatures val spec = KubernetesExecutorSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/AntiAffinityFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/AntiAffinityFeatureStepSuite.scala new file mode 100644 index 000000000000..28a703d44fbf --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/AntiAffinityFeatureStepSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL + +class AntiAffinityFeatureStepSuite extends SparkFunSuite { + + test("enable executor pod antiAffinity") { + val conf = new SparkConf() + .set("spark.kubernetes.executor.pod.antiAffinity.enable", "true") + val executorConf = KubernetesTestConf.createExecutorConf(conf) + val sparkPod = + new AntiAffinityFeatureStep(executorConf).configurePod(SparkPod.initialPod()) + + val podAntiAffinity = sparkPod.pod.getSpec.getAffinity.getPodAntiAffinity + assert(podAntiAffinity != null) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution != null) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.size() == 1) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.get(0).getPodAffinityTerm != null) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.get(0) + .getPodAffinityTerm.getLabelSelector != null) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.get(0) + .getPodAffinityTerm.getLabelSelector.getMatchExpressions != null) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.get(0) + .getPodAffinityTerm.getLabelSelector.getMatchExpressions.size() == 1) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.get(0) + .getPodAffinityTerm.getLabelSelector.getMatchExpressions.get(0) + .getKey.equals(SPARK_APP_ID_LABEL)) + assert(podAntiAffinity + .getPreferredDuringSchedulingIgnoredDuringExecution.get(0) + .getPodAffinityTerm.getLabelSelector.getMatchExpressions.get(0) + .getValues.get(0).equals(KubernetesTestConf.APP_ID)) + } +}