diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index d8cb881bf082..4ebf31ae44ee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -117,6 +117,7 @@ private[spark] class KubernetesDriverConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) + .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } } def serviceLabels: Map[String, String] = { @@ -188,6 +189,7 @@ private[spark] class KubernetesExecutorConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } } override def secretNamesToMountPaths: Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 2b287ea85604..11a21bb68a6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -143,8 +143,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .editOrNewMetadata() .withName(driverPodName) .addToLabels(conf.labels.asJava) - .addToAnnotations(conf.annotations.map { case (k, v) => - (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava) + .addToAnnotations(conf.annotations.asJava) .endMetadata() .editOrNewSpec() .withRestartPolicy("Never") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 0b0bbc30ba41..f3e5cad8c9e2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -255,14 +255,12 @@ private[spark] class BasicExecutorFeatureStep( case "statefulset" => "Always" case _ => "Never" } - val annotations = kubernetesConf.annotations.map { case (k, v) => - (k, Utils.substituteAppNExecIds(v, kubernetesConf.appId, kubernetesConf.executorId)) - } + val executorPodBuilder = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) .addToLabels(kubernetesConf.labels.asJava) - .addToAnnotations(annotations.asJava) + .addToAnnotations(kubernetesConf.annotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 3d310a831ea2..9963db016ad9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID +import org.apache.spark.util.Utils class KubernetesConfSuite extends SparkFunSuite { @@ -42,7 +43,9 @@ class KubernetesConfSuite extends SparkFunSuite { "customLabel2Key" -> "customLabel2Value") private val CUSTOM_ANNOTATIONS = Map( "customAnnotation1Key" -> "customAnnotation1Value", - "customAnnotation2Key" -> "customAnnotation2Value") + "customAnnotation2Key" -> "customAnnotation2Value", + "customAnnotation3Key" -> "{{APP_ID}}", + "customAnnotation4Key" -> "{{EXECUTOR_ID}}") private val SECRET_NAMES_TO_MOUNT_PATHS = Map( "secret1" -> "/mnt/secrets/secret1", "secret2" -> "/mnt/secrets/secret2") @@ -93,7 +96,9 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ CUSTOM_LABELS) - assert(conf.annotations === CUSTOM_ANNOTATIONS) + assert(conf.annotations === CUSTOM_ANNOTATIONS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) + }) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) assert(conf.environment === CUSTOM_ENVS) @@ -161,7 +166,9 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) - assert(conf.annotations === CUSTOM_ANNOTATIONS) + assert(conf.annotations === CUSTOM_ANNOTATIONS.map { + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) + }) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) }