-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-33063][K8S] Improve error message for insufficient K8s volume confs #29941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,9 +16,12 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import scala.util.Try | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
|
|
||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove this addition? The existing one line looks okay to me. (https://github.com/databricks/scala-style-guide#blank-lines-vertical-whitespace)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes! just waiting for the checks to pass. Removing now |
||
| private[spark] object KubernetesVolumeUtils { | ||
| /** | ||
| * Extract Spark volume configuration properties with a given name prefix. | ||
|
|
@@ -38,7 +41,7 @@ private[spark] object KubernetesVolumeUtils { | |
| KubernetesVolumeSpec( | ||
| volumeName = volumeName, | ||
| mountPath = properties(pathKey), | ||
| mountSubPath = properties.get(subPathKey).getOrElse(""), | ||
| mountSubPath = properties.getOrElse(subPathKey, ""), | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), | ||
| volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName)) | ||
| }.toSeq | ||
|
|
@@ -67,17 +70,31 @@ private[spark] object KubernetesVolumeUtils { | |
| volumeType match { | ||
| case KUBERNETES_VOLUMES_HOSTPATH_TYPE => | ||
| val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" | ||
| KubernetesHostPathVolumeConf(options(pathKey)) | ||
| Try(KubernetesHostPathVolumeConf(options(pathKey))) | ||
| .fold( | ||
| _ => throw new NoSuchElementException(s"When using $KUBERNETES_VOLUMES_HOSTPATH_TYPE " + | ||
| "Kubernetes volumes, it is necessary to define " + | ||
| s"$KUBERNETES_VOLUMES_HOSTPATH_TYPE.volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" + | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didnt use path key because it includes the volume name given by the user and I wasn't sure about this. I've added it. |
||
| s" property"), | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| kubernetesHostPathVolumeConf => kubernetesHostPathVolumeConf | ||
| ) | ||
|
|
||
| case KUBERNETES_VOLUMES_PVC_TYPE => | ||
| val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY" | ||
| val storageClassKey = | ||
| s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY" | ||
| val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" | ||
| KubernetesPVCVolumeConf( | ||
| Try(KubernetesPVCVolumeConf( | ||
| options(claimNameKey), | ||
| options.get(storageClassKey), | ||
| options.get(sizeLimitKey)) | ||
| options.get(sizeLimitKey))) | ||
| .fold( | ||
| _ => throw new NoSuchElementException(s"When using $KUBERNETES_VOLUMES_PVC_TYPE " + | ||
| "Kubernetes volumes, it is necessary to define " + | ||
| s"$KUBERNETES_VOLUMES_PVC_TYPE.volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY" | ||
| + s" property"), | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| kubernetesPVCVolumeConf => kubernetesPVCVolumeConf | ||
| ) | ||
|
|
||
| case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => | ||
| val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" | ||
|
|
@@ -87,9 +104,17 @@ private[spark] object KubernetesVolumeUtils { | |
| case KUBERNETES_VOLUMES_NFS_TYPE => | ||
| val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" | ||
| val serverKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY" | ||
| KubernetesNFSVolumeConf( | ||
| Try(KubernetesNFSVolumeConf( | ||
| options(pathKey), | ||
| options(serverKey)) | ||
| options(serverKey))) | ||
| .fold( | ||
| _ => throw new NoSuchElementException(s"When using $KUBERNETES_VOLUMES_NFS_TYPE " + | ||
| "Kubernetes volumes, it is necessary to define " + | ||
| s"$KUBERNETES_VOLUMES_NFS_TYPE.volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY and " + | ||
| s"$KUBERNETES_VOLUMES_NFS_TYPE.volumeName.$KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY" + | ||
| s" properties"), | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| kubernetesNFSVolumeConf => kubernetesNFSVolumeConf | ||
| ) | ||
|
|
||
| case _ => | ||
| throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,7 +96,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { | |
| assert(volumeSpec.mountReadOnly === false) | ||
| } | ||
|
|
||
| test("Fails on missing mount key") { | ||
| test("Fails on missing mount key in emptyDir") { | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| val sparkConf = new SparkConf(false) | ||
| sparkConf.set("test.emptyDir.volumeName.mnt.path", "/path") | ||
|
|
||
|
|
@@ -106,7 +106,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { | |
| assert(e.getMessage.contains("emptyDir.volumeName.mount.path")) | ||
| } | ||
|
|
||
| test("Fails on missing option key") { | ||
| test("Fails on missing option key in hostPath") { | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| val sparkConf = new SparkConf(false) | ||
| sparkConf.set("test.hostPath.volumeName.mount.path", "/path") | ||
| sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true") | ||
|
|
@@ -118,6 +118,18 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { | |
| assert(e.getMessage.contains("hostPath.volumeName.options.path")) | ||
| } | ||
|
|
||
| test("Fails on missing option key in persistentVolumeClaim") { | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| val sparkConf = new SparkConf(false) | ||
| sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") | ||
| sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") | ||
| sparkConf.set("test.persistentVolumeClaim.volumeName.options.clamName", "claimeName") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although I understand your intention to use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree! I did it to follow the "style" of the other tests, but I'll remove it. |
||
|
|
||
| val e = intercept[NoSuchElementException] { | ||
| KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.") | ||
| } | ||
| assert(e.getMessage.contains("persistentVolumeClaim.volumeName.options.claimName")) | ||
| } | ||
|
|
||
| test("Parses read-only nfs volumes correctly") { | ||
| val sparkConf = new SparkConf(false) | ||
| sparkConf.set("test.nfs.volumeName.mount.path", "/path") | ||
|
|
@@ -148,7 +160,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { | |
| KubernetesNFSVolumeConf("/share", "nfs.example.com")) | ||
| } | ||
|
|
||
| test("Fails on missing path option") { | ||
| test("Fails on missing path option in nfs") { | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| val sparkConf = new SparkConf(false) | ||
| sparkConf.set("test.nfs.volumeName.mount.path", "/path") | ||
| sparkConf.set("test.nfs.volumeName.mount.readOnly", "true") | ||
|
|
@@ -160,7 +172,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { | |
| assert(e.getMessage.contains("nfs.volumeName.options.path")) | ||
| } | ||
|
|
||
| test("Fails on missing server option") { | ||
| test("Fails on missing server option in nfs") { | ||
|
Gschiavon marked this conversation as resolved.
Outdated
|
||
| val sparkConf = new SparkConf(false) | ||
| sparkConf.set("test.nfs.volumeName.mount.path", "/path") | ||
| sparkConf.set("test.nfs.volumeName.mount.readOnly", "true") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.