Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
val KUBERNETES_DRIVER_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.driver.pod.namePrefix")
.doc("Prefix to use in front of the driver pod names.")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")
val KUBERNETES_DRIVER_JOB_BACKOFFLIMIT =
ConfigBuilder("spark.kubernetes.driver.job.backofflimit")
.doc("Driver job backofflimit.")
.intConf
.checkValue(value => value > 0, "Backofflimit must be a positive number")
.createWithDefault(6)

val KUBERNETES_PYSPARK_PY_FILES =
ConfigBuilder("spark.kubernetes.python.pyFiles")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private[spark] object KubernetesConf {
KubernetesConf(
sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod),
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
appId,
appId,
executorLabels,
executorAnnotations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.HasMetadata

private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
job: SparkJob,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])

private[spark] object KubernetesDriverSpec {
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
SparkPod.initialPod(),
SparkJob.initialJob(),
Seq.empty,
initialProps)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Job, JobBuilder}

private[spark] case class SparkJob(job: Job, container: Container)

private[spark] object SparkJob {
def initialJob(): SparkJob = {
SparkJob(
new JobBuilder()
.withNewMetadata()
.endMetadata()
.withNewSpec()
.endSpec()
.build(),
new ContainerBuilder().build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ private[spark] class BasicDriverFeatureStep(
conf: KubernetesConf[KubernetesDriverSpecificConf])
extends KubernetesFeatureConfigStep {

private val driverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
private val driverPodNamePrefix = conf
.get(KUBERNETES_DRIVER_POD_NAME_PREFIX)
.getOrElse(s"${conf.appResourceNamePrefix}-driver")

private val driverContainerImage = conf
Expand Down Expand Up @@ -93,7 +93,7 @@ private[spark] class BasicDriverFeatureStep(

val driverPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(driverPodName)
.withName(driverPodNamePrefix)
.addToLabels(conf.roleLabels.asJava)
.addToAnnotations(conf.roleAnnotations.asJava)
.endMetadata()
Expand All @@ -109,9 +109,8 @@ private[spark] class BasicDriverFeatureStep(

override def getAdditionalPodSystemProperties(): Map[String, String] = {
val additionalProps = mutable.Map(
KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
KUBERNETES_DRIVER_POD_NAME_PREFIX.key -> driverPodNamePrefix,
"spark.app.id" -> conf.appId,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true")

val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
Expand All @@ -127,5 +126,4 @@ private[spark] class BasicDriverFeatureStep(
additionalProps.toMap
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class BasicExecutorFeatureStep(
.sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)

private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
private val executorPodNamePrefix = kubernetesConf.appId

private val driverUrl = RpcEndpointAddress(
kubernetesConf.get("spark.driver.host"),
Expand Down Expand Up @@ -176,8 +176,4 @@ private[spark] class BasicExecutorFeatureStep(

SparkPod(executorPod, containerWithLimitCores)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,4 @@ private[spark] class EnvSecretsFeatureStep(
.build()
SparkPod(pod.pod, containerWithEnvVars)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep {
/**
* Return any system properties that should be set on the JVM in accordance to this feature.
*/
def getAdditionalPodSystemProperties(): Map[String, String]
def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

/**
* Return any additional Kubernetes resources that should be added to support this feature. Only
* applicable when creating the driver in cluster mode.
*/
def getAdditionalKubernetesResources(): Seq[HasMetadata]
def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,4 @@ private[spark] class LocalDirsFeatureStep(
.build()
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,5 @@ private[spark] class MountSecretsFeatureStep(
SparkPod(podWithVolumes, containerWithMounts)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty

private def secretVolumeName(secretName: String): String = s"$secretName-volume"
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,4 @@ private[spark] class JavaDriverFeatureStep(
.build()
SparkPod(pod.pod, withDriverArgs)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,4 @@ private[spark] class PythonDriverFeatureStep(

SparkPod(pod.pod, withPythonPrimaryContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ private[spark] object ClientArguments {
}

/**
* Submits a Spark application to run on Kubernetes by creating the driver pod and starting a
* Submits a Spark application to run on Kubernetes by creating the driver job and starting a
* watcher that monitors and logs the application status. Waits for the application to terminate if
* spark.kubernetes.submission.waitAppCompletion is true.
*
* @param builder Responsible for building the base driver pod based on a composition of
* @param builder Responsible for building the base driver job based on a composition of
* implemented features.
* @param kubernetesConf application configuration
* @param kubernetesClient the client to talk to the Kubernetes API server
Expand All @@ -96,21 +96,21 @@ private[spark] object ClientArguments {
* @param watcher a watcher that monitors and logs the application status
*/
private[spark] class Client(
builder: KubernetesDriverBuilder,
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
watcher: LoggingPodStatusWatcher,
kubernetesResourceNamePrefix: String) extends Logging {
builder: KubernetesDriverBuilder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation.

kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
watcher: LoggingJobStatusWatcher,
kubernetesResourceNamePrefix: String) extends Logging {

def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf)
val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
// Spark command builder to pickup on the Java Options present in the ConfigMap
val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)
val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.job.container)
.addNewEnv()
.withName(ENV_SPARK_CONF_DIR)
.withValue(SPARK_CONF_DIR_INTERNAL)
Expand All @@ -120,31 +120,41 @@ private[spark] class Client(
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.build()
val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
.editSpec()
.addToContainers(resolvedDriverContainer)
.addNewVolume()
.withName(SPARK_CONF_VOLUME)
.withNewConfigMap()
.withName(configMapName)
.endConfigMap()
.endVolume()
.endSpec()
val resolvedDriverJob = new JobBuilder(resolvedDriverSpec.job.job)
.editOrNewSpec()
.editOrNewTemplate()
.editOrNewSpec()
.addToContainers(resolvedDriverContainer)
.addNewVolume()
.withName(SPARK_CONF_VOLUME)
.withNewConfigMap()
.withName(configMapName)
.endConfigMap()
.endVolume()
.withRestartPolicy("OnFailure")
.endSpec()
.endTemplate()
.endSpec()
.build()
// If the fabric8 kubernetes client will support kubernetes 1.8 this
// should be removed and fixed with a more proper way
// (https://github.com/fabric8io/kubernetes-client/issues/1020)
resolvedDriverJob.getSpec.setAdditionalProperty("backoffLimit",
kubernetesConf.get(KUBERNETES_DRIVER_JOB_BACKOFFLIMIT))
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
kubernetesClient.extensions()
.jobs()
.withName(resolvedDriverJob.getMetadata.getName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
val createdDriverJob = kubernetesClient.extensions().jobs().create(resolvedDriverJob)
try {
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
addDriverOwnerReference(createdDriverJob, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.extensions().jobs().delete(createdDriverJob)
throw e
}

Expand All @@ -158,19 +168,19 @@ private[spark] class Client(
}
}

// Add a OwnerReference to the given resources making the driver pod an owner of them so when
// the driver pod is deleted, the resources are garbage collected.
private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
.withUid(driverPod.getMetadata.getUid)
.withKind(driverPod.getKind)
// Add a OwnerReference to the given resources making the driver job an owner of them so when
// the driver job is deleted, the resources are garbage collected.
private def addDriverOwnerReference(driverJob: Job, resources: Seq[HasMetadata]): Unit = {
val driverJobOwnerReference = new OwnerReferenceBuilder()
.withName(driverJob.getMetadata.getName)
.withApiVersion(driverJob.getApiVersion)
.withUid(driverJob.getMetadata.getUid)
.withKind(driverJob.getKind)
.withController(true)
.build()
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
originalMetadata.setOwnerReferences(Collections.singletonList(driverJobOwnerReference))
}
}

Expand Down Expand Up @@ -229,9 +239,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)

Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Expand All @@ -246,7 +253,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesClient,
waitForAppCompletion,
appName,
watcher,
new LoggingJobStatusWatcherImpl(kubernetesAppId, kubernetesClient),
kubernetesResourceNamePrefix)
client.run()
}
Expand Down
Loading