Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
<module>external/kafka-0-10-assembly</module>
<module>external/kafka-0-10-sql</module>
<module>external/avro</module>
<module>resource-managers/kubernetes/core</module>
<!-- See additional modules enabled by profiles below -->
</modules>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,19 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(true)

val KUBERNETES_DRIVER_POD_KIND =
ConfigBuilder("spark.kubernetes.driver.pod.kind")
.doc("Specify the kind of driver's pod on Kubernetes.")
.stringConf
.checkValues(Set("Pod", "Job", "Deployment"))
.createWithDefault("Pod")

val KUBERNETES_JOB_BACKOFFLIMIT =
ConfigBuilder("spark.kubernetes.job.backofflimit")
.doc("Specify backoffLimit of job.")
.intConf
.createWithDefault(3)

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import java.util.{Collections, UUID}
import java.util.Properties

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apps.{Deployment, DeploymentBuilder}
import io.fabric8.kubernetes.api.model.batch.{Job, JobBuilder}
import io.fabric8.kubernetes.client.KubernetesClient

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
Expand All @@ -33,6 +35,8 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils



/**
* Encapsulates arguments to the submission client.
*
Expand Down Expand Up @@ -130,20 +134,65 @@ private[spark] class Client(
.endVolume()
.endSpec()
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName

val driverPodKind = conf.sparkConf.get(KUBERNETES_DRIVER_POD_KIND)

Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)

var createdDriverPod: HasMetadata = null

if(driverPodKind.equals("Deployment")) {
val resolvedDeploymentPod = new DeploymentBuilder()
.editOrNewMetadata()
.withName(s"$driverPodName")
.endMetadata()
.withNewSpec()
.withReplicas(1)
.editOrNewTemplate()
.withMetadata(resolvedDriverPod.getMetadata)
.editOrNewSpecLike(resolvedDriverPod.getSpec)
.withRestartPolicy("Always")
.endSpec()
.endTemplate()
.endSpec()
.build()
createdDriverPod = kubernetesClient.extensions().deployments().create(resolvedDeploymentPod)

} else if (driverPodKind.equals("Pod")) {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
} else {
val resolvedJobPod = new JobBuilder()
.withMetadata(resolvedDriverPod.getMetadata)
.withNewSpec()
.withBackoffLimit(conf.sparkConf.get(KUBERNETES_JOB_BACKOFFLIMIT))
.editOrNewTemplate()
.withMetadata(resolvedDriverPod.getMetadata)
.withSpec(resolvedDriverPod.getSpec)
.endTemplate()
.endSpec()
.build()
createdDriverPod = kubernetesClient.extensions().jobs().create(resolvedJobPod)
}

try {
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
if (driverPodKind.equals("Job")) {
kubernetesClient.extensions().deployments().delete(createdDriverPod.asInstanceOf[Deployment])
} else if (driverPodKind.equals("Pod")) {
kubernetesClient.pods().delete(createdDriverPod.asInstanceOf[Pod])
} else {
kubernetesClient.extensions().jobs().delete(createdDriverPod.asInstanceOf[Job])
}
throw e
}

Expand All @@ -159,7 +208,7 @@ private[spark] class Client(

// Add a OwnerReference to the given resources making the driver pod an owner of them so when
// the driver pod is deleted, the resources are garbage collected.
private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
private def addDriverOwnerReference(driverPod: HasMetadata, resources: Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
Expand Down