From bddb23f789972b2c84a604aaa73976613fe2f3f1 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Sun, 21 Apr 2024 16:32:18 +0800 Subject: [PATCH 1/2] [SPARK-47730][K8S] Support APP_ID and EXECUTOR_ID placeholders in labels Currently, only the pod annotations supports `APP_ID` and `EXECUTOR_ID` placeholders. This commit aims to add the same function to pod labels. The use case is to support using customized labels for availability zone based topology pod affinity. We want to use the Spark application ID as the customized label value, to allow Spark executor pods to run in the same availability zone as Spark driver pod. Although we can use the Spark internal label `spark-app-selector` directly, this is not a good practice when using it along with YuniKorn Gang Scheduling. When Gang Scheduling is enabled, the YuniKorn placeholder pods should use the same affinity as real Spark pods. In this way, we have to add the internal `spark-app-selector` label to the placeholder pods. This is not good because the placeholder pods could be recognized as Spark pods in the monitoring system. Thus we propose supporting the `APP_ID` and `EXECUTOR_ID` placeholders in Spark pod labels as well for flexibility. --- .../apache/spark/deploy/k8s/KubernetesConf.scala | 10 ++++++---- .../spark/deploy/k8s/KubernetesConfSuite.scala | 13 ++++++++++--- .../k8s/features/BasicDriverFeatureStepSuite.scala | 9 ++++++--- .../k8s/integrationtest/BasicTestsSuite.scala | 2 ++ .../k8s/integrationtest/KubernetesSuite.scala | 6 ++++-- 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index a1ef04f4e311..b55f9317d10b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -100,8 +100,9 @@ private[spark] class KubernetesDriverConf( SPARK_APP_ID_LABEL -> appId, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + val driverCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } presetLabels.keys.foreach { key => require( @@ -173,8 +174,9 @@ private[spark] class KubernetesExecutorConf( SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString) - val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + val executorCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } presetLabels.keys.foreach { key => require( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 9963db016ad9..3c53e9b74f92 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -40,7 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite { "execNodeSelectorKey2" -> "execNodeSelectorValue2") private val CUSTOM_LABELS = Map( "customLabel1Key" -> "customLabel1Value", - "customLabel2Key" -> "customLabel2Value") + "customLabel2Key" -> "customLabel2Value", + "customLabel3Key" -> "{{APP_ID}}", + "customLabel4Key" -> "{{EXECUTOR_ID}}") private val CUSTOM_ANNOTATIONS = Map( "customAnnotation1Key" -> "customAnnotation1Value", "customAnnotation2Key" -> "customAnnotation2Value", @@ -95,7 +97,9 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ - CUSTOM_LABELS) + CUSTOM_LABELS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) + }) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }) @@ -165,7 +169,10 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, - SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) + SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ + CUSTOM_LABELS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) + }) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) }) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index f102851e6c3b..22dbc05a7323 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -35,7 +35,9 @@ import org.apache.spark.util.Utils class BasicDriverFeatureStepSuite extends SparkFunSuite { - private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue") + private val CUSTOM_DRIVER_LABELS = Map( + "labelkey" -> "labelvalue", + "yunikorn.apache.org/app-id" -> "{{APPID}}") private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" private val DRIVER_ANNOTATIONS = Map( "customAnnotation" -> "customAnnotationValue", @@ -121,10 +123,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(driverPodMetadata.getName === "spark-driver-pod") // Check custom and preset labels are as expected + val labels = driverPodMetadata.getLabels CUSTOM_DRIVER_LABELS.foreach { case (k, v) => - assert(driverPodMetadata.getLabels.get(k) === v) + assert(labels.get(k) === Utils.substituteAppNExecIds(v, KubernetesTestConf.APP_ID, "")) } - assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava) + assert(labels === kubernetesConf.labels.asJava) val annotations = driverPodMetadata.getAnnotations.asScala DRIVER_ANNOTATIONS.foreach { case (k, v) => diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index d6911aadfa23..87c82c06d6d4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -102,6 +102,7 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => sparkAppConf .set("spark.kubernetes.driver.label.label1", "label1-value") .set("spark.kubernetes.driver.label.label2", "label2-value") + .set("spark.kubernetes.driver.label.yunikorn.apache.org/app-id", "{{APP_ID}}") .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") @@ -109,6 +110,7 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") .set("spark.kubernetes.executor.label.label1", "label1-value") .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.label.yunikorn.apache.org/app-id", "{{APP_ID}}") .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 868461fd5b9e..614c82010d9f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -589,7 +589,8 @@ class KubernetesSuite extends SparkFunSuite assert(pod.getMetadata.getLabels.get("label2") === "label2-value") assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") - val appId = pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id") + val appIdLabel = pod.getMetadata.getLabels.get("yunikorn.apache.org/app-id") + val appIdAnnotation = pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id") val container = pod.getSpec.getContainers.get(0) val envVars = container @@ -601,7 +602,8 @@ class KubernetesSuite extends SparkFunSuite .toMap assert(envVars("ENV1") === "VALUE1") assert(envVars("ENV2") === "VALUE2") - assert(appId === envVars(ENV_APPLICATION_ID)) + assert(appIdLabel === envVars(ENV_APPLICATION_ID)) + assert(appIdAnnotation === envVars(ENV_APPLICATION_ID)) } private def deleteDriverPod(): Unit = { From 528539cd6229d6583652d9b17a5e8bbf25cffc08 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Sat, 27 Apr 2024 17:09:08 +0800 Subject: [PATCH 2/2] Address comments to use a general label key in unit tests. Also fix the typo value `APPID` -> `APP_ID` in `BasicDriverFeatureStepSuite`. --- .../deploy/k8s/features/BasicDriverFeatureStepSuite.scala | 4 ++-- .../deploy/k8s/integrationtest/BasicTestsSuite.scala | 8 ++++---- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 22dbc05a7323..bf022ac63015 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -37,11 +37,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { private val CUSTOM_DRIVER_LABELS = Map( "labelkey" -> "labelvalue", - "yunikorn.apache.org/app-id" -> "{{APPID}}") + "customAppIdLabelKey" -> "{{APP_ID}}") private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" private val DRIVER_ANNOTATIONS = Map( "customAnnotation" -> "customAnnotationValue", - "yunikorn.apache.org/app-id" -> "{{APPID}}") + "customAppIdAnnotation" -> "{{APP_ID}}") private val DRIVER_ENVS = Map( "customDriverEnv1" -> "customDriverEnv1Value", "customDriverEnv2" -> "customDriverEnv2Value") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 87c82c06d6d4..0dafe30c364a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -102,18 +102,18 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => sparkAppConf .set("spark.kubernetes.driver.label.label1", "label1-value") .set("spark.kubernetes.driver.label.label2", "label2-value") - .set("spark.kubernetes.driver.label.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.driver.label.customAppIdLabelKey", "{{APP_ID}}") .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") - .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.driver.annotation.customAppIdAnnotation", "{{APP_ID}}") .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") .set("spark.kubernetes.executor.label.label1", "label1-value") .set("spark.kubernetes.executor.label.label2", "label2-value") - .set("spark.kubernetes.executor.label.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.executor.label.customAppIdLabelKey", "{{APP_ID}}") .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") - .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.executor.annotation.customAppIdAnnotation", "{{APP_ID}}") .set("spark.executorEnv.ENV1", "VALUE1") .set("spark.executorEnv.ENV2", "VALUE2") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 614c82010d9f..0b0b30e5e04f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -589,8 +589,8 @@ class KubernetesSuite extends SparkFunSuite assert(pod.getMetadata.getLabels.get("label2") === "label2-value") assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") - val appIdLabel = pod.getMetadata.getLabels.get("yunikorn.apache.org/app-id") - val appIdAnnotation = pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id") + val appIdLabel = pod.getMetadata.getLabels.get("customAppIdLabelKey") + val appIdAnnotation = pod.getMetadata.getAnnotations.get("customAppIdAnnotation") val container = pod.getSpec.getContainers.get(0) val envVars = container