Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,10 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY = "options.type"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = "options.storageClass"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s

private[spark] sealed trait KubernetesVolumeSpecificConf

private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesHostPathVolumeConf(hostPath: String, volumeType: String)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesPVCVolumeConf(
Expand All @@ -42,5 +42,6 @@ private[spark] case class KubernetesVolumeSpec(
volumeName: String,
mountPath: String,
mountSubPath: String,
mountSubPathExpr: String,
mountReadOnly: Boolean,
volumeConf: KubernetesVolumeSpecificConf)
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ object KubernetesVolumeUtils {
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val subPathExprKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"

val volumeLabelsMap = properties
Expand All @@ -57,6 +58,7 @@ object KubernetesVolumeUtils {
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeLabelsMap)))
Expand Down Expand Up @@ -87,8 +89,11 @@ object KubernetesVolumeUtils {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
val typeKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
KubernetesHostPathVolumeConf(options(pathKey))
// "" means that no checks will be performed before mounting the hostPath volume
// backward compatibility default
KubernetesHostPathVolumeConf(options(pathKey), options.getOrElse(typeKey, ""))

case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
.withSubPath(spec.mountSubPath)
.withSubPathExpr(spec.mountSubPathExpr)
.withName(spec.volumeName)
.build()

val volumeBuilder = spec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
/* "" means that no checks will be performed before mounting the hostPath volume */
case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))
.withHostPath(new HostPathVolumeSource(hostPath, volumeType))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ object KubernetesTestConf {

volumes.foreach { case spec =>
val (vtype, configs) = spec.volumeConf match {
case KubernetesHostPathVolumeConf(path) =>
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
(KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,20 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
KubernetesHostPathVolumeConf("/hostPath"))
KubernetesHostPathVolumeConf("/hostPath", ""))
}

test("Parses hostPath volume type correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
sparkConf.set("test.hostPath.volumeName.options.type", "Type")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
KubernetesHostPathVolumeConf("/hostPath", "Type"))
}

test("Parses subPath correctly") {
Expand All @@ -43,6 +56,20 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountSubPath === "subPath")
assert(volumeSpec.mountSubPathExpr === "")
}

test("Parses subPathExpr correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountSubPath === "")
assert(volumeSpec.mountSubPathExpr === "subPathExpr")
}

test("Parses persistentVolumeClaim volumes correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
"spark-local-dir-test",
"/tmp",
"",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
Expand All @@ -47,6 +48,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
Expand All @@ -69,6 +71,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID")
)
Expand All @@ -94,6 +97,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID", Some("fast"), Some("512M"))
)
Expand All @@ -119,6 +123,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf("OnDemand")
)
Expand All @@ -136,6 +141,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
Expand All @@ -156,6 +162,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
Expand All @@ -177,6 +184,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"checkpointVolume1",
"/checkpoints1",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim1",
storageClass = Some("gp3"),
Expand All @@ -188,6 +196,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"checkpointVolume2",
"/checkpoints2",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim2",
storageClass = Some("gp3"),
Expand All @@ -209,6 +218,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
)
Expand All @@ -226,6 +236,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
false,
KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
)
Expand All @@ -249,6 +260,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
Expand All @@ -271,6 +283,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
false,
KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
)
Expand All @@ -293,6 +306,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"",
"",
true,
KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
)
Expand All @@ -315,13 +329,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"hpVolume",
"/tmp",
"",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val pvcVolumeConf = KubernetesVolumeSpec(
"checkpointVolume",
"/checkpoints",
"",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
Expand All @@ -339,13 +355,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"hpVolume",
"/data",
"",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val pvcVolumeConf = KubernetesVolumeSpec(
"checkpointVolume",
"/data",
"",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
Expand All @@ -364,6 +382,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"foo",
"",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
Expand All @@ -383,6 +402,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
"bar",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
Expand All @@ -406,13 +426,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testEmptyDir",
"/tmp/foo",
"foo",
"",
true,
KubernetesEmptyDirVolumeConf(None, None)
)
val pvcSpec = KubernetesVolumeSpec(
"testPVC",
"/tmp/bar",
"bar",
"",
true,
KubernetesEmptyDirVolumeConf(None, None)
)
Expand Down