diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index f4b362b164ef9..baa10a6e3c709 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -20,6 +20,7 @@ private[spark] object Constants { // Labels val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_APP_NAME_LABEL = "spark-app-name" val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" val SPARK_RESOURCE_PROFILE_ID_LABEL = "spark-exec-resourceprofile-id" val SPARK_ROLE_LABEL = "spark-role" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 0eef6e1ca9f78..b94de8878b5ac 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s import java.util.{Locale, UUID} import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} +import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ @@ -94,6 +95,7 @@ private[spark] class KubernetesDriverConf( override def labels: Map[String, String] = { val presetLabels = Map( SPARK_APP_ID_LABEL -> appId, + SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) @@ -155,6 +157,7 @@ private[spark] class KubernetesExecutorConf( val presetLabels = Map( SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_APP_ID_LABEL -> appId, + SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString) @@ -248,6 +251,21 @@ private[spark] object KubernetesConf { .replaceAll("-+", "-") } + def getAppNameLabel(appName: String): String = { + // According to https://kubernetes.io/docs/concepts/overview/working-with-objects/labels, + // must be 63 characters or less to follow the DNS label standard, so take the 63 characters + // of the appName name as the label. + StringUtils.abbreviate( + s"$appName" + .trim + .toLowerCase(Locale.ROOT) + .replaceAll("[^a-z0-9\\-]", "-") + .replaceAll("-+", "-"), + "", + KUBERNETES_DNSNAME_MAX_LENGTH + ) + } + /** * Build a resources name based on the vendor device plugin naming * convention of: vendor-domain/resource. For example, an NVIDIA GPU is diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 925f9dc93a26d..49681dc8191c2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -142,6 +142,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .editOrNewMetadata() .withName(driverPodName) .addToLabels(conf.labels.asJava) + .addToLabels(SPARK_APP_NAME_LABEL, KubernetesConf.getAppNameLabel(conf.appName)) .addToAnnotations(conf.annotations.asJava) .endMetadata() .editOrNewSpec() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 4d5646897e7bb..3f0a21e72ffbf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -277,6 +277,10 @@ private[spark] class BasicExecutorFeatureStep( .withName(name) .addToLabels(kubernetesConf.labels.asJava) .addToLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfile.id.toString) + .addToLabels( + SPARK_APP_NAME_LABEL, + KubernetesConf.getAppNameLabel(kubernetesConf.appName) + ) .addToAnnotations(kubernetesConf.annotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index f5cfd8e69e23a..f6b6f76031e7b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -89,6 +89,7 @@ class KubernetesConfSuite extends SparkFunSuite { None) assert(conf.labels === Map( SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, + SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ CUSTOM_LABELS) assert(conf.annotations === CUSTOM_ANNOTATIONS) @@ -155,6 +156,7 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.labels === Map( SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID, SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, + SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) assert(conf.annotations === CUSTOM_ANNOTATIONS) @@ -213,4 +215,11 @@ class KubernetesConfSuite extends SparkFunSuite { val driverConf = KubernetesTestConf.createDriverConf(sparkConf) assert(driverConf.schedulerName === "driverScheduler") } + + test("SPARK-36566: get app name label") { + assert(KubernetesConf.getAppNameLabel(" Job+Spark-Pi 2021") === "job-spark-pi-2021") + assert(KubernetesConf.getAppNameLabel("a" * 63) === "a" * 63) + assert(KubernetesConf.getAppNameLabel("a" * 64) === "a" * 63) + assert(KubernetesConf.getAppNameLabel("a" * 253) === "a" * 63) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 2ee5b2f08ded6..9e52c6ef6ccf1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder, Quantity} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation @@ -116,7 +116,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val driverPodMetadata = configuredPod.pod.getMetadata assert(driverPodMetadata.getName === "spark-driver-pod") - DRIVER_LABELS.foreach { case (k, v) => + val DEFAULT_LABELS = Map( + SPARK_APP_NAME_LABEL-> KubernetesConf.getAppNameLabel(kubernetesConf.appName) + ) + (DRIVER_LABELS ++ DEFAULT_LABELS).foreach { case (k, v) => assert(driverPodMetadata.getLabels.get(k) === v) } assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index e59772dc6b34a..b0e7a34a4732f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -27,7 +27,7 @@ import io.fabric8.kubernetes.api.model._ import org.scalatest.BeforeAndAfter import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SecretVolumeUtils, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorConf, KubernetesTestConf, SecretVolumeUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation @@ -149,13 +149,17 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("basic executor pod has reasonable defaults") { - val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), + val conf = newExecutorConf() + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), defaultProfile) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. assert(executor.pod.getMetadata.getName === s"$RESOURCE_NAME_PREFIX-exec-1") - LABELS.foreach { case (k, v) => + val DEFAULT_LABELS = Map( + SPARK_APP_NAME_LABEL-> KubernetesConf.getAppNameLabel(conf.appName) + ) + (LABELS ++ DEFAULT_LABELS).foreach { case (k, v) => assert(executor.pod.getMetadata.getLabels.get(k) === v) } assert(executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS)