diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 77921f6338c74..b2eacca042794 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -67,6 +67,7 @@ private[spark] object KubernetesVolumeUtils { volumeType match { case KUBERNETES_VOLUMES_HOSTPATH_TYPE => val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" + verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE) KubernetesHostPathVolumeConf(options(pathKey)) case KUBERNETES_VOLUMES_PVC_TYPE => @@ -74,6 +75,7 @@ private[spark] object KubernetesVolumeUtils { val storageClassKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY" val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" + verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE) KubernetesPVCVolumeConf( options(claimNameKey), options.get(storageClassKey), @@ -87,6 +89,8 @@ private[spark] object KubernetesVolumeUtils { case KUBERNETES_VOLUMES_NFS_TYPE => val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" val serverKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY" + verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_NFS_TYPE) + verifyOptionKey(options, serverKey, KUBERNETES_VOLUMES_NFS_TYPE) KubernetesNFSVolumeConf( options(pathKey), options(serverKey)) @@ -95,4 +99,10 @@ private[spark] object KubernetesVolumeUtils { throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported") } } + + private def verifyOptionKey(options: Map[String, String], key: String, msg: String): Unit = { + if (!options.isDefinedAt(key)) { + throw new NoSuchElementException(key + s" is required for $msg") + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index 6596c5e2ad2e7..349cbd04f6027 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -118,6 +118,17 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(e.getMessage.contains("hostPath.volumeName.options.path")) } + test("SPARK-33063: Fails on missing option key in persistentVolumeClaim") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") + + val e = intercept[NoSuchElementException] { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.") + } + assert(e.getMessage.contains("persistentVolumeClaim.volumeName.options.claimName")) + } + test("Parses read-only nfs volumes correctly") { val sparkConf = new SparkConf(false) sparkConf.set("test.nfs.volumeName.mount.path", "/path")