Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>1.4.17</kubernetes.client.version>
<kubernetes.client.version>1.4.34</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ private[spark] class Client(
.endSpec()
.done()
sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName)
sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)

sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
val submitRequest = buildSubmissionRequest()
Expand All @@ -131,6 +133,23 @@ private[spark] class Client(

val podWatcher = new Watcher[Pod] {
override def eventReceived(action: Action, t: Pod): Unit = {
if (action == Action.ADDED) {
val ownerRefs = new ArrayBuffer[OwnerReference]
ownerRefs += new OwnerReferenceBuilder()
.withApiVersion(t.getApiVersion)
.withController(true)
.withKind(t.getKind)
.withName(t.getMetadata.getName)
.withUid(t.getMetadata.getUid)
.build()

secret.getMetadata().setOwnerReferences(ownerRefs.asJava)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugs in the library prevented me from abstracting this out to a common function.

        def addOwnerReference(owner: HasMetadata, resource: HasMetadata) = {
          val ownerRefs = new ArrayBuffer[OwnerReference]
          ownerRefs += new OwnerReferenceBuilder()
            .withApiVersion(owner.getApiVersion)
            .withController(true)
            .withKind(owner.getKind)
            .withName(owner.getMetadata.getName)
            .withUid(owner.getMetadata.getUid)
            .build()
          resource.getMetadata().setOwnerReferences(ownerRefs.asJava)
          kubernetesClient.resource(resource).createOrReplace()
        }

I couldn't get the above to work for service, although it works for secrets, pods, etc.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to compile for me, does it not run properly?

            if (action == Action.ADDED) {
              def addOwnerReference(owner: HasMetadata, resource: HasMetadata) = {
                val ownerRefs = new ArrayBuffer[OwnerReference]
                ownerRefs += new OwnerReferenceBuilder()
                  .withApiVersion(owner.getApiVersion)
                  .withController(true)
                  .withKind(owner.getKind)
                  .withName(owner.getMetadata.getName)
                  .withUid(owner.getMetadata.getUid)
                  .build()
                resource.getMetadata().setOwnerReferences(ownerRefs.asJava)
                kubernetesClient.resource(resource).createOrReplace()
              }

              addOwnerReference(t, secret)
              addOwnerReference(t, service)
            }

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it doesn't work as expected for the service and fails silently without adding any owner references. Ideally, we'd want to PATCH and not PUT but that doesn't work either.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the link

kubernetesClient.secrets().createOrReplace(secret)

service.getMetadata().setOwnerReferences(ownerRefs.asJava)
kubernetesClient.services().createOrReplace(service)
}

if ((action == Action.ADDED || action == Action.MODIFIED)
&& t.getStatus.getPhase == "Running"
&& !submitCompletedFuture.isDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the service name the driver is running with"))

private val kubernetesDriverPodName = conf
.getOption("spark.kubernetes.driver.pod.name")
.getOrElse(
throw new SparkException("Must specify the driver pod name"))

private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)

Expand All @@ -82,6 +87,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val kubernetesClient = KubernetesClientBuilder
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)

val driverPod = try {
kubernetesClient.pods().inNamespace(kubernetesNamespace).
withName(kubernetesDriverPodName).get()
} catch {
case throwable: Throwable =>
logError(s"Executor cannot find driver pod.", throwable)
throw new SparkException(s"Executor cannot find driver pod", throwable)
}

override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
Expand Down Expand Up @@ -202,7 +216,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withNewMetadata()
.withName(name)
.withLabels(selectors)
.endMetadata()
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(s"exec-${applicationId()}-container")
Expand Down