Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,10 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY = "options.type"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = "options.storageClass"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s

private[spark] sealed trait KubernetesVolumeSpecificConf

private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesHostPathVolumeConf(hostPath: String, volumeType: String)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesPVCVolumeConf(
Expand All @@ -42,5 +42,6 @@ private[spark] case class KubernetesVolumeSpec(
volumeName: String,
mountPath: String,
mountSubPath: String,
mountSubPathExpr: String,
mountReadOnly: Boolean,
volumeConf: KubernetesVolumeSpecificConf)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ object KubernetesVolumeUtils {
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val subPathExprKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)

val volumeLabelsMap = properties
.filter(_._1.startsWith(labelKey))
Expand All @@ -57,6 +59,7 @@ object KubernetesVolumeUtils {
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeLabelsMap)))
Expand Down Expand Up @@ -87,8 +90,11 @@ object KubernetesVolumeUtils {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
val typeKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
KubernetesHostPathVolumeConf(options(pathKey))
// "" means that no checks will be performed before mounting the hostPath volume
// backward compatibility default
KubernetesHostPathVolumeConf(options(pathKey), options.getOrElse(typeKey, ""))

case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
Expand Down Expand Up @@ -129,6 +135,16 @@ object KubernetesVolumeUtils {
}
}

private def verifyMutuallyExclusiveOptionKeys(
options: Map[String, String],
keys: String*): Unit = {
val givenKeys = keys.filter(options.contains)
if (givenKeys.length > 1) {
throw new IllegalArgumentException("These config options are mutually exclusive: " +
s"${givenKeys.mkString(", ")}")
}
}

private def verifySize(size: Option[String]): Unit = {
size.foreach { v =>
if (v.forall(_.isDigit) && parseLong(v) < 1024) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
.withSubPath(spec.mountSubPath)
.withSubPathExpr(spec.mountSubPathExpr)
.withName(spec.volumeName)
.build()

val volumeBuilder = spec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
/* "" means that no checks will be performed before mounting the hostPath volume */
case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))
.withHostPath(new HostPathVolumeSource(hostPath, volumeType))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ object KubernetesTestConf {

volumes.foreach { case spec =>
val (vtype, configs) = spec.volumeConf match {
case KubernetesHostPathVolumeConf(path) =>
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
case KubernetesHostPathVolumeConf(hostPath, volumeType) =>
(KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
Expand Down Expand Up @@ -145,6 +146,10 @@ object KubernetesTestConf {
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY),
spec.mountSubPath)
}
if (spec.mountSubPathExpr.nonEmpty) {
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY),
spec.mountSubPathExpr)
}
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
spec.mountReadOnly.toString)
configs.foreach { case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,20 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
KubernetesHostPathVolumeConf("/hostPath"))
KubernetesHostPathVolumeConf("/hostPath", ""))
}

test("Parses hostPath volume type correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
sparkConf.set("test.hostPath.volumeName.options.type", "Type")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
KubernetesHostPathVolumeConf("/hostPath", "Type"))
}

test("Parses subPath correctly") {
Expand All @@ -43,6 +56,33 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountSubPath === "subPath")
assert(volumeSpec.mountSubPathExpr === "")
}

test("Parses subPathExpr correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountSubPath === "")
assert(volumeSpec.mountSubPathExpr === "subPathExpr")
}

test("Rejects mutually exclusive subPath and subPathExpr") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
sparkConf.set("test.emptyDir.volumeName.mount.subPathExpr", "subPathExpr")

val msg = intercept[IllegalArgumentException] {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
}.getMessage
assert(msg === "These config options are mutually exclusive: " +
"emptyDir.volumeName.mount.subPath, emptyDir.volumeName.mount.subPathExpr")
}

test("Parses persistentVolumeClaim volumes correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
"spark-local-dir-test",
"/tmp",
"",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
KubernetesHostPathVolumeConf("/hostPath/tmp", "")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)
Expand Down
Loading