diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index a1ef04f4e311..b55f9317d10b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -100,8 +100,9 @@ private[spark] class KubernetesDriverConf( SPARK_APP_ID_LABEL -> appId, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + val driverCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } presetLabels.keys.foreach { key => require( @@ -173,8 +174,9 @@ private[spark] class KubernetesExecutorConf( SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString) - val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + val executorCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } presetLabels.keys.foreach { key => require( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 9963db016ad9..3c53e9b74f92 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -40,7 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite { "execNodeSelectorKey2" -> "execNodeSelectorValue2") private val CUSTOM_LABELS = Map( "customLabel1Key" -> "customLabel1Value", - "customLabel2Key" -> "customLabel2Value") + "customLabel2Key" -> "customLabel2Value", + "customLabel3Key" -> "{{APP_ID}}", + "customLabel4Key" -> "{{EXECUTOR_ID}}") private val CUSTOM_ANNOTATIONS = Map( "customAnnotation1Key" -> "customAnnotation1Value", "customAnnotation2Key" -> "customAnnotation2Value", @@ -95,7 +97,9 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ - CUSTOM_LABELS) + CUSTOM_LABELS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) + }) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }) @@ -165,7 +169,10 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, - SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) + SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ + CUSTOM_LABELS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) + }) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) }) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index f102851e6c3b..bf022ac63015 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -35,11 +35,13 @@ import org.apache.spark.util.Utils class BasicDriverFeatureStepSuite extends SparkFunSuite { - private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue") + private val CUSTOM_DRIVER_LABELS = Map( + "labelkey" -> "labelvalue", + "customAppIdLabelKey" -> "{{APP_ID}}") private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" private val DRIVER_ANNOTATIONS = Map( "customAnnotation" -> "customAnnotationValue", - "yunikorn.apache.org/app-id" -> "{{APPID}}") + "customAppIdAnnotation" -> "{{APP_ID}}") private val DRIVER_ENVS = Map( "customDriverEnv1" -> "customDriverEnv1Value", "customDriverEnv2" -> "customDriverEnv2Value") @@ -121,10 +123,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(driverPodMetadata.getName === "spark-driver-pod") // Check custom and preset labels are as expected + val labels = driverPodMetadata.getLabels CUSTOM_DRIVER_LABELS.foreach { case (k, v) => - assert(driverPodMetadata.getLabels.get(k) === v) + assert(labels.get(k) === Utils.substituteAppNExecIds(v, KubernetesTestConf.APP_ID, "")) } - assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava) + assert(labels === kubernetesConf.labels.asJava) val annotations = driverPodMetadata.getAnnotations.asScala DRIVER_ANNOTATIONS.foreach { case (k, v) => diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index d6911aadfa23..0dafe30c364a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -102,16 +102,18 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => sparkAppConf .set("spark.kubernetes.driver.label.label1", "label1-value") .set("spark.kubernetes.driver.label.label2", "label2-value") + .set("spark.kubernetes.driver.label.customAppIdLabelKey", "{{APP_ID}}") .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") - .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.driver.annotation.customAppIdAnnotation", "{{APP_ID}}") .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") .set("spark.kubernetes.executor.label.label1", "label1-value") .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.label.customAppIdLabelKey", "{{APP_ID}}") .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") - .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") + .set("spark.kubernetes.executor.annotation.customAppIdAnnotation", "{{APP_ID}}") .set("spark.executorEnv.ENV1", "VALUE1") .set("spark.executorEnv.ENV2", "VALUE2") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 868461fd5b9e..0b0b30e5e04f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -589,7 +589,8 @@ class KubernetesSuite extends SparkFunSuite assert(pod.getMetadata.getLabels.get("label2") === "label2-value") assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") - val appId = pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id") + val appIdLabel = pod.getMetadata.getLabels.get("customAppIdLabelKey") + val appIdAnnotation = pod.getMetadata.getAnnotations.get("customAppIdAnnotation") val container = pod.getSpec.getContainers.get(0) val envVars = container @@ -601,7 +602,8 @@ class KubernetesSuite extends SparkFunSuite .toMap assert(envVars("ENV1") === "VALUE1") assert(envVars("ENV2") === "VALUE2") - assert(appId === envVars(ENV_APPLICATION_ID)) + assert(appIdLabel === envVars(ENV_APPLICATION_ID)) + assert(appIdAnnotation === envVars(ENV_APPLICATION_ID)) } private def deleteDriverPod(): Unit = {