Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = "options.storageClass"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.HasMetadata

private[spark] case class KubernetesExecutorSpec(
pod: SparkPod,
executorKubernetesResources: Seq[HasMetadata])
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.deploy.k8s
import java.io.{File, IOException}
import java.net.URI
import java.security.SecureRandom
import java.util.UUID
import java.util.{Collections, UUID}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -323,4 +323,22 @@ private[spark] object KubernetesUtils extends Logging {
.build()
}
}

// Add a OwnerReference to the given resources making the pod an owner of them so when
// the pod is deleted, the resources are garbage collected.
def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this :)
Initially I was thinking about asking about re-using the PVCs because I know that in some systems PVC creating is slow (for dynamic scaling), but it's probably better to return the resources and not do some weird storage pool like I was thinking of.

if (pod != null) {
val reference = new OwnerReferenceBuilder()
.withName(pod.getMetadata.getName)
.withApiVersion(pod.getApiVersion)
.withUid(pod.getMetadata.getUid)
.withKind(pod.getKind)
.withController(true)
.build()
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(reference))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ private[spark] sealed trait KubernetesVolumeSpecificConf
private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesPVCVolumeConf(claimName: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
Comment on lines +26 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do storageClass and size must be both defined or empty? Can we only define storageClass but without size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We need both. In this PR, if one of them is missing, the fallback operation is the existing PVC mounting behavior. So, it doesn't try to create PVC and assume the PVC exists with the given PVC name.

extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ private[spark] object KubernetesVolumeUtils {

case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
KubernetesPVCVolumeConf(options(claimNameKey))
val storageClassKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
*/
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants.ENV_EXECUTOR_ID

private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
import MountVolumesFeatureStep._

val additionalResources = ArrayBuffer.empty[HasMetadata]

override def configurePod(pod: SparkPod): SparkPod = {
val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip
Expand All @@ -43,7 +49,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
private def constructVolumes(
volumeSpecs: Iterable[KubernetesVolumeSpec]
): Iterable[(VolumeMount, Volume)] = {
volumeSpecs.map { spec =>
volumeSpecs.zipWithIndex.map { case (spec, i) =>
val volumeMount = new VolumeMountBuilder()
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
Expand All @@ -57,10 +63,32 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))

case KubernetesPVCVolumeConf(claimNameTemplate) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate.replaceAll(ENV_EXECUTOR_ID, c.executorId)
val claimName = claimNameTemplate
.replaceAll(PVC_ON_DEMAND,
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
.replaceAll(ENV_EXECUTOR_ID, c.executorId)

if (storageClass.isDefined && size.isDefined) {
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
.withAccessModes(PVC_ACCESS_MODE)
.withResources(new ResourceRequirementsBuilder()
.withRequests(Map("storage" -> new Quantity(size.get)).asJava).build())
.endSpec()
.build())
}

claimName

case _ =>
claimNameTemplate
}
Expand All @@ -84,4 +112,15 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
(volumeMount, volume)
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources
}
}

private[spark] object MountVolumesFeatureStep {
val PVC_ON_DEMAND = "OnDemand"
val PVC = "PersistentVolumeClaim"
val PVC_POSTFIX = "-pvc"
val PVC_ACCESS_MODE = "ReadWriteOnce"
Comment on lines +122 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have the OnDemand, postfix, and access mode configurable?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, it's possible, but I didn't do that in this PR because of the followings.

  • PVC_ON_DEMAND: It's a dummy placeholder because the existing code expects some pre-defined names. We had better recommend a fixed one instead of making a configurable one in this case.
  • PVC_POSTFIX: It can be configurable but doesn't give much benefit to users because this is a part of transient ids and the prefix already guarantees no conflicts.
  • PVC_ACCESS_MODE: Ya. I thought like you at the beginning, but I changed to this form to reduce the problem surface. Although PVC_ACCESS_MODE config makes sense a lot, I leave this PR to focus on a fixed one because this PR aims to generate a new PVC for each executor. In other words, this PR is not suggesting creating a ReadWriteMany PVC and sharing across in multiple executors.

For ReadWriteMany PVC, we don't need to use this PR. The existing Spark PVC feature can mount a single ReadWriteMany PVC into all executors without any problem and there is no burden to maintain ReadWriteMany PVC, because it's always a single one. In addition, we also support NFS (AWS EFS) mounting, additionally.

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ import java.io.StringWriter
import java.util.{Collections, UUID}
import java.util.Properties

import scala.collection.mutable
import scala.util.control.Breaks._
import scala.util.control.NonFatal

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.mutable
import scala.util.control.NonFatal
import util.control.Breaks._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -134,7 +136,7 @@ private[spark] class Client(
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
addOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -163,22 +165,6 @@ private[spark] class Client(
}
}

// Add a OwnerReference to the given resources making the driver pod an owner of them so when
// the driver pod is deleted, the resources are garbage collected.
private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
.withUid(driverPod.getMetadata.getUid)
.withKind(driverPod.getKind)
.withController(true)
.build()
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
}

Comment on lines -166 to -181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for unifying this code :)

// Build a Config Map that will house spark conf properties in a single file for spark-submit
private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
val properties = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}

import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable
import scala.util.control.NonFatal

import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, Utils}

Expand Down Expand Up @@ -212,16 +215,33 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val executorPod = resolvedExecutorSpec.pod
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
kubernetesClient.pods().create(podWithAttachedContainer)
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
try {
val resources = resolvedExecutorSpec.executorKubernetesResources
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
}
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
throw e
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class KubernetesExecutorBuilder {
def buildFromFeatures(
conf: KubernetesExecutorConf,
secMgr: SecurityManager,
client: KubernetesClient): SparkPod = {
client: KubernetesClient): KubernetesExecutorSpec = {
val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
.map { file =>
KubernetesUtils.loadPodFromTemplate(
Expand All @@ -47,7 +47,17 @@ private[spark] class KubernetesExecutorBuilder {
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf))

features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
val spec = KubernetesExecutorSpec(
initialPod,
executorKubernetesResources = Seq.empty)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesExecutorSpec(
configuredPod,
spec.executorKubernetesResources ++ addedResources)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))

case KubernetesPVCVolumeConf(claimName) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName))
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf)

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}")
}

test("Create and mount persistentVolumeClaims in executors") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
)
val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("Mounts emptyDir") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
Expand Down Expand Up @@ -202,9 +202,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!podsAllocatorUnderTest.isDeleted("4"))
}

private def executorPodAnswer(): Answer[SparkPod] =
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
executorPodWithId(k8sConf.executorId.toInt)
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt), Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
sparkConf.set("spark.driver.host", "https://driver.host.com")
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client).pod
}

}