From e28c2daaff2f950246c896f0dda5f28f4f4d15b4 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Wed, 12 Sep 2018 22:47:57 +0300 Subject: [PATCH] fix executor names collision --- .../spark/deploy/k8s/KubernetesConf.scala | 13 +++++++++++- .../submit/KubernetesClientApplication.scala | 21 +++++++++++++++---- .../k8s/ExecutorPodsAllocatorSuite.scala | 16 +++++++++++--- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 3aa35d419073f..cae6e7d5ad518 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry @@ -220,10 +221,20 @@ private[spark] object KubernetesConf { val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) + // If no prefix is defined then we are in pure client mode + // (not the one used by cluster mode inside the container) + val appResourceNamePrefix = { + if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) { + getResourceNamePrefix(getAppName(sparkConf)) + } else { + sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + } + } + KubernetesConf( sparkConf.clone(), KubernetesExecutorSpecificConf(executorId, driverPod), - sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX), + appResourceNamePrefix, appId, executorLabels, executorAnnotations, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 986c950ab365a..edeaa380194ac 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -211,11 +211,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" - val launchTime = System.currentTimeMillis() val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) - val kubernetesResourceNamePrefix = { - s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") - } + val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName) sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse("")) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, @@ -254,3 +251,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { + val launchTime = System.currentTimeMillis() + s"$appName-$launchTime" + .trim + .toLowerCase + .replaceAll("\\s+", "-") + .replaceAll("\\.", "-") + .replaceAll("[^a-z0-9\\-]", "") + .replaceAll("-+", "-") + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e847f8590d353..0e617b0021019 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { executorSpecificConf.executorId, TEST_SPARK_APP_ID, Some(driverPod)) - k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && + + // Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX + // has not be set for the tests and thus KubernetesConf will use a random + // string for the prefix, based on the app name, and this comparison here will fail. + val k8sConfCopy = k8sConf + .copy(appResourceNamePrefix = "") + .copy(sparkConf = conf) + val expectedK8sConfCopy = expectedK8sConf + .copy(appResourceNamePrefix = "") + .copy(sparkConf = conf) + + k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && // Since KubernetesConf.createExecutorConf clones the SparkConf object, force // deep equality comparison for the SparkConf object and use object equality // comparison on all other fields. - k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf) + k8sConfCopy == expectedK8sConfCopy } } }) - }