diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index f2104d433ad4..3b2b5612566a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -142,7 +142,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .editOrNewMetadata() .withName(driverPodName) .addToLabels(conf.labels.asJava) - .addToAnnotations(conf.annotations.asJava) + .addToAnnotations(conf.annotations.map { case (k, v) => + (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava) .endMetadata() .editOrNewSpec() .withRestartPolicy("Never") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index c6084720c56f..a7625194bd6e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -272,11 +272,14 @@ private[spark] class BasicExecutorFeatureStep( case "statefulset" => "Always" case _ => "Never" } + val annotations = kubernetesConf.annotations.map { case (k, v) => + (k, Utils.substituteAppNExecIds(v, kubernetesConf.appId, kubernetesConf.executorId)) + } val executorPodBuilder = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) .addToLabels(kubernetesConf.labels.asJava) - .addToAnnotations(kubernetesConf.annotations.asJava) + .addToAnnotations(annotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 83444e5518e3..0b54599bd1d3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder, Quantity} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation @@ -36,7 +36,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { private val CUSTOM_DRIVER_LABELS = Map("labelkey" -> "labelvalue") private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" - private val DRIVER_ANNOTATIONS = Map("customAnnotation" -> "customAnnotationValue") + private val DRIVER_ANNOTATIONS = Map( + "customAnnotation" -> "customAnnotationValue", + "yunikorn.apache.org/app-id" -> "{{APPID}}") private val DRIVER_ENVS = Map( "customDriverEnv1" -> "customDriverEnv2", "customDriverEnv2" -> "customDriverEnv2") @@ -62,7 +64,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { sparkConf.set(testRInfo.rId.amountConf, testRInfo.count) sparkConf.set(testRInfo.rId.vendorConf, testRInfo.vendor) } - val kubernetesConf = KubernetesTestConf.createDriverConf( + val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, labels = CUSTOM_DRIVER_LABELS, environment = DRIVER_ENVS, @@ -123,7 +125,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { } assert(driverPodMetadata.getLabels === kubernetesConf.labels.asJava) - assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS) + val annotations = driverPodMetadata.getAnnotations.asScala + DRIVER_ANNOTATIONS.foreach { case (k, v) => + assert(annotations(k) === Utils.substituteAppNExecIds(v, KubernetesTestConf.APP_ID, "")) + } assert(configuredPod.pod.getSpec.getRestartPolicy === "Never") val expectedSparkConf = Map( KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 0e79f6c55440..a79442ac6358 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -82,12 +82,14 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => .set("spark.kubernetes.driver.label.label2", "label2-value") .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") + .set("spark.kubernetes.driver.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") .set("spark.kubernetes.executor.label.label1", "label1-value") .set("spark.kubernetes.executor.label.label2", "label2-value") .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") + .set("spark.kubernetes.executor.annotation.yunikorn.apache.org/app-id", "{{APP_ID}}") .set("spark.executorEnv.ENV1", "VALUE1") .set("spark.executorEnv.ENV2", "VALUE2") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 69b736951301..c7834e9e7c37 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -35,6 +35,7 @@ import org.scalatest.matchers.should.Matchers._ import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.Constants.ENV_APPLICATION_ID import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.internal.Logging @@ -556,6 +557,7 @@ class KubernetesSuite extends SparkFunSuite assert(pod.getMetadata.getLabels.get("label2") === "label2-value") assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") + val appId = pod.getMetadata.getAnnotations.get("yunikorn.apache.org/app-id") val container = pod.getSpec.getContainers.get(0) val envVars = container @@ -567,6 +569,7 @@ class KubernetesSuite extends SparkFunSuite .toMap assert(envVars("ENV1") === "VALUE1") assert(envVars("ENV2") === "VALUE2") + assert(appId === envVars(ENV_APPLICATION_ID)) } private def deleteDriverPod(): Unit = {