diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 388defd93465d..86d7dec2c076f 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -29,7 +29,7 @@
Spark Project Kubernetes
kubernetes
- 1.4.17
+ 1.4.34
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index 537f6b6a115e9..77b7c793dc37e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -123,6 +123,8 @@ private[spark] class Client(
.endSpec()
.done()
sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName)
+ sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)
+
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
val submitRequest = buildSubmissionRequest()
@@ -131,6 +133,23 @@ private[spark] class Client(
val podWatcher = new Watcher[Pod] {
override def eventReceived(action: Action, t: Pod): Unit = {
+ if (action == Action.ADDED) {
+ val ownerRefs = new ArrayBuffer[OwnerReference]
+ ownerRefs += new OwnerReferenceBuilder()
+ .withApiVersion(t.getApiVersion)
+ .withController(true)
+ .withKind(t.getKind)
+ .withName(t.getMetadata.getName)
+ .withUid(t.getMetadata.getUid)
+ .build()
+
+ secret.getMetadata().setOwnerReferences(ownerRefs.asJava)
+ kubernetesClient.secrets().createOrReplace(secret)
+
+ service.getMetadata().setOwnerReferences(ownerRefs.asJava)
+ kubernetesClient.services().createOrReplace(service)
+ }
+
if ((action == Action.ADDED || action == Action.MODIFIED)
&& t.getStatus.getPhase == "Running"
&& !submitCompletedFuture.isDone) {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
index b7110ba901842..f512c50a9a934 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
@@ -60,6 +60,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the service name the driver is running with"))
+ private val kubernetesDriverPodName = conf
+ .getOption("spark.kubernetes.driver.pod.name")
+ .getOrElse(
+ throw new SparkException("Must specify the driver pod name"))
+
private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
@@ -82,6 +87,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val kubernetesClient = KubernetesClientBuilder
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)
+ val driverPod = try {
+ kubernetesClient.pods().inNamespace(kubernetesNamespace).
+ withName(kubernetesDriverPodName).get()
+ } catch {
+ case throwable: Throwable =>
+ logError(s"Executor cannot find driver pod.", throwable)
+ throw new SparkException(s"Executor cannot find driver pod", throwable)
+ }
+
override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
@@ -202,7 +216,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withNewMetadata()
.withName(name)
.withLabels(selectors)
- .endMetadata()
+ .withOwnerReferences()
+ .addNewOwnerReference()
+ .withController(true)
+ .withApiVersion(driverPod.getApiVersion)
+ .withKind(driverPod.getKind)
+ .withName(driverPod.getMetadata.getName)
+ .withUid(driverPod.getMetadata.getUid)
+ .endOwnerReference()
+ .endMetadata()
.withNewSpec()
.addNewContainer()
.withName(s"exec-${applicationId()}-container")