Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ private[spark] object KubernetesVolumeUtils {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
KubernetesHostPathVolumeConf(options(pathKey))

case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
val storageClassKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE)
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
Expand All @@ -87,6 +89,8 @@ private[spark] object KubernetesVolumeUtils {
case KUBERNETES_VOLUMES_NFS_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
val serverKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_NFS_TYPE)
verifyOptionKey(options, serverKey, KUBERNETES_VOLUMES_NFS_TYPE)
KubernetesNFSVolumeConf(
options(pathKey),
options(serverKey))
Expand All @@ -95,4 +99,10 @@ private[spark] object KubernetesVolumeUtils {
throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported")
}
}

private def verifyOptionKey(options: Map[String, String], key: String, msg: String): Unit = {
if (!options.isDefinedAt(key)) {
throw new NoSuchElementException(key + s" is required for $msg")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(e.getMessage.contains("hostPath.volumeName.options.path"))
}

test("SPARK-33063: Fails on missing option key in persistentVolumeClaim") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")

val e = intercept[NoSuchElementException] {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
}
assert(e.getMessage.contains("persistentVolumeClaim.volumeName.options.claimName"))
}

test("Parses read-only nfs volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.nfs.volumeName.mount.path", "/path")
Expand Down