Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.internal.config.ConfigEntry


Expand Down Expand Up @@ -220,10 +221,20 @@ private[spark] object KubernetesConf {
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

// If no prefix is defined then we are in pure client mode
// (not the one used by cluster mode inside the container)
val appResourceNamePrefix = {
if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
getResourceNamePrefix(getAppName(sparkConf))
} else {
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
}
}

KubernetesConf(
sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod),
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
appResourceNamePrefix,
appId,
executorLabels,
executorAnnotations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val kubernetesResourceNamePrefix = {
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
}
val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName)
sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse(""))
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
Expand Down Expand Up @@ -254,3 +251,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
}
}
}

private[spark] object KubernetesClientApplication {

def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark")

def getResourceNamePrefix(appName: String): String = {
val launchTime = System.currentTimeMillis()
s"$appName-$launchTime"
.trim
.toLowerCase
.replaceAll("\\s+", "-")
.replaceAll("\\.", "-")
.replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-")
}

@skonto skonto Sep 12, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce dashes. for example:

scala> " Spark   #$ d #.Pi"
.trim
.toLowerCase
.replaceAll("\\s+", "-")
.replaceAll("\\.", "-")
.replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-")
res15: String = spark-d-pi

}
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
executorSpecificConf.executorId,
TEST_SPARK_APP_ID,
Some(driverPod))
k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&

// Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX
// has not be set for the tests and thus KubernetesConf will use a random
// string for the prefix, based on the app name, and this comparison here will fail.
val k8sConfCopy = k8sConf
.copy(appResourceNamePrefix = "")
.copy(sparkConf = conf)
val expectedK8sConfCopy = expectedK8sConf
.copy(appResourceNamePrefix = "")
.copy(sparkConf = conf)

k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
// Since KubernetesConf.createExecutorConf clones the SparkConf object, force
// deep equality comparison for the SparkConf object and use object equality
// comparison on all other fields.
k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf)
k8sConfCopy == expectedK8sConfCopy
}
}
})

}