Skip to content

Commit 04a6670

Browse files
mccheahonursatici
authored andcommitted
Add back Kubernetes local file mounting. (apache#521)
* Add back local file mounting. * Commit back entrypoint changes, add unit test * Remove unnecessary whitespace change
1 parent 69247ad commit 04a6670

File tree

9 files changed

+285
-2
lines changed

9 files changed

+285
-2
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,30 @@ private[spark] object Config extends Logging {
325325
.booleanConf
326326
.createWithDefault(true)
327327

328+
val FILES_DOWNLOAD_LOCATION =
329+
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
330+
.doc("Location to download files to in the driver and executors. When using " +
331+
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
332+
"volume on the driver and executor pods.")
333+
.stringConf
334+
.createWithDefault("/var/spark-data/spark-files")
335+
336+
val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET =
337+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretName")
338+
.doc("Name of the secret that should be mounted into the executor containers for" +
339+
" distributing submitted small files without the resource staging server.")
340+
.internal()
341+
.stringConf
342+
.createOptional
343+
344+
val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH =
345+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretMountPath")
346+
.doc(s"Mount path in the executors for the secret given by" +
347+
s" ${EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key}")
348+
.internal()
349+
.stringConf
350+
.createOptional
351+
328352
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
329353
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
330354
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ private[spark] object Constants {
6868
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
6969
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
7070

71+
// Local file mounting constants
72+
val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
73+
val MOUNTED_FILES_SECRET_DIR = "/var/data/spark-submitted-files"
74+
7175
// BINDINGS
7276
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7377
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,15 @@ private[spark] object KubernetesUtils extends Logging {
205205
def formatTime(time: String): String = {
206206
if (time != null) time else "N/A"
207207
}
208+
209+
def submitterLocalFiles(fileUris: Iterable[String]): Iterable[String] = {
210+
fileUris
211+
.map(Utils.resolveURI)
212+
.filter { file =>
213+
Option(file.getScheme).getOrElse("file") == "file"
214+
}
215+
.map(_.getPath)
216+
.map(new File(_))
217+
.map(_.getAbsolutePath)
218+
}
208219
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.k8s.features
19+
20+
import java.io.File
21+
import java.nio.file.Paths
22+
23+
import scala.collection.JavaConverters._
24+
25+
import com.google.common.io.{BaseEncoding, Files}
26+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder}
27+
28+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, SparkPod}
29+
import org.apache.spark.deploy.k8s.Config._
30+
import org.apache.spark.deploy.k8s.Constants._
31+
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, PythonMainAppResource, RMainAppResource}
32+
import org.apache.spark.util.Utils
33+
34+
private[spark] class MountLocalDriverFilesFeatureStep(kubernetesConf: KubernetesDriverConf)
35+
extends MountLocalFilesFeatureStep(kubernetesConf) {
36+
37+
val allFiles: Seq[String] = {
38+
Utils.stringToSeq(kubernetesConf.sparkConf.get("spark.files", "")) ++
39+
kubernetesConf.pyFiles ++
40+
(kubernetesConf.mainAppResource match {
41+
case JavaMainAppResource(_) => Nil
42+
case PythonMainAppResource(res) => Seq(res)
43+
case RMainAppResource(res) => Seq(res)
44+
})
45+
}
46+
}
47+
48+
private[spark] class MountLocalExecutorFilesFeatureStep(
49+
kubernetesConf: KubernetesConf)
50+
extends MountLocalFilesFeatureStep(kubernetesConf) {
51+
52+
val allFiles: Seq[String] = Nil
53+
}
54+
55+
private[spark] abstract class MountLocalFilesFeatureStep(
56+
kubernetesConf: KubernetesConf)
57+
extends KubernetesFeatureConfigStep {
58+
59+
private val secretName = {
60+
kubernetesConf.get(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET)
61+
.getOrElse(s"${kubernetesConf.resourceNamePrefix}-mounted-small-files")
62+
}
63+
64+
override def configurePod(pod: SparkPod): SparkPod = {
65+
val resolvedPod = new PodBuilder(pod.pod)
66+
.editOrNewSpec()
67+
.addNewVolume()
68+
.withName("submitted-files")
69+
.withNewSecret()
70+
.withSecretName(secretName)
71+
.endSecret()
72+
.endVolume()
73+
.endSpec()
74+
.build()
75+
val resolvedContainer = new ContainerBuilder(pod.container)
76+
.addNewEnv()
77+
.withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR)
78+
.withValue(MOUNTED_FILES_SECRET_DIR)
79+
.endEnv()
80+
.addNewVolumeMount()
81+
.withName("submitted-files")
82+
.withMountPath(MOUNTED_FILES_SECRET_DIR)
83+
.endVolumeMount()
84+
.build()
85+
SparkPod(resolvedPod, resolvedContainer)
86+
}
87+
88+
override def getAdditionalPodSystemProperties(): Map[String, String] = {
89+
val resolvedFiles = allFiles()
90+
.map(file => {
91+
val uri = Utils.resolveURI(file)
92+
val scheme = Option(uri.getScheme).getOrElse("file")
93+
if (scheme != "file") {
94+
file
95+
} else {
96+
val fileName = Paths.get(uri.getPath).getFileName.toString
97+
s"$MOUNTED_FILES_SECRET_DIR/$fileName"
98+
}
99+
})
100+
Map(
101+
EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key -> secretName,
102+
"spark.files" -> resolvedFiles.mkString(","))
103+
}
104+
105+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
106+
val localFiles = allFiles()
107+
.map(Utils.resolveURI)
108+
.filter { file =>
109+
Option(file.getScheme).getOrElse("file") == "file"
110+
}
111+
.map(_.getPath)
112+
.map(new File(_))
113+
val localFileBase64Contents = localFiles.map { file =>
114+
val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file))
115+
(file.getName, fileBase64)
116+
}.toMap
117+
val localFilesSecret = new SecretBuilder()
118+
.withNewMetadata()
119+
.withName(secretName)
120+
.endMetadata()
121+
.withData(localFileBase64Contents.asJava)
122+
.build()
123+
Seq(localFilesSecret)
124+
}
125+
126+
def allFiles(): Seq[String]
127+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ private[spark] class KubernetesDriverBuilder {
4848
new DriverCommandFeatureStep(conf),
4949
new HadoopConfDriverFeatureStep(conf),
5050
new KerberosConfDriverFeatureStep(conf),
51-
new PodTemplateConfigMapStep(conf))
51+
new PodTemplateConfigMapStep(conf),
52+
new MountLocalDriverFilesFeatureStep(conf))
5253

5354
val spec = KubernetesDriverSpec(
5455
initialPod,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ private[spark] class KubernetesExecutorBuilder {
4444
new MountSecretsFeatureStep(conf),
4545
new EnvSecretsFeatureStep(conf),
4646
new LocalDirsFeatureStep(conf),
47-
new MountVolumesFeatureStep(conf))
47+
new MountVolumesFeatureStep(conf),
48+
new MountLocalExecutorFilesFeatureStep(conf))
4849

4950
features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
5051
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ abstract class PodBuilderSuite extends SparkFunSuite {
108108
assert(container.getTerminationMessagePath === "termination-message-path")
109109
assert(container.getTerminationMessagePolicy === "termination-message-policy")
110110
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume"))
111+
112+
assert(container.getVolumeMounts.asScala.exists(_.getName == "submitted-files"))
111113
}
112114

113115
private def podWithSupportedFeatures(): Pod = {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import java.io.File
20+
21+
import scala.collection.JavaConverters._
22+
23+
import com.google.common.base.Charsets
24+
import com.google.common.io.{BaseEncoding, Files}
25+
import io.fabric8.kubernetes.api.model.Secret
26+
import org.scalatest.BeforeAndAfter
27+
28+
import org.apache.spark.{SparkConf, SparkFunSuite}
29+
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
30+
import org.apache.spark.deploy.k8s.Config._
31+
import org.apache.spark.deploy.k8s.Constants._
32+
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
33+
import org.apache.spark.util.Utils
34+
35+
class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
36+
37+
private var kubernetesConf: KubernetesDriverConf = _
38+
private var sparkFiles: Seq[String] = _
39+
private var localFiles: Seq[File] = _
40+
private var stepUnderTest: MountLocalFilesFeatureStep = _
41+
42+
before {
43+
val tempDir = Utils.createTempDir()
44+
val firstLocalFile = new File(tempDir, "file1.txt")
45+
Files.write("a", firstLocalFile, Charsets.UTF_8)
46+
val secondLocalFile = new File(tempDir, "file2.txt")
47+
Files.write("b", secondLocalFile, Charsets.UTF_8)
48+
sparkFiles = Seq(
49+
firstLocalFile.getAbsolutePath,
50+
s"file://${secondLocalFile.getAbsolutePath}",
51+
"https://localhost:9000/file3.txt")
52+
localFiles = Seq(firstLocalFile, secondLocalFile)
53+
val sparkConf = new SparkConf(false)
54+
.set("spark.files", sparkFiles.mkString(","))
55+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET, "secret")
56+
kubernetesConf = new KubernetesDriverConf(
57+
sparkConf,
58+
"test-app",
59+
JavaMainAppResource(None),
60+
"main",
61+
Array.empty[String],
62+
Seq.empty)
63+
stepUnderTest = new MountLocalDriverFilesFeatureStep(kubernetesConf)
64+
}
65+
66+
test("Attaches a secret volume and secret name.") {
67+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
68+
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
69+
val volume = configuredPod.pod.getSpec.getVolumes.get(0)
70+
assert(volume.getName === "submitted-files")
71+
assert(volume.getSecret.getSecretName === "secret")
72+
assert(configuredPod.container.getVolumeMounts.size === 1)
73+
val volumeMount = configuredPod.container.getVolumeMounts.get(0)
74+
assert(volumeMount.getName === "submitted-files")
75+
assert(volumeMount.getMountPath === MOUNTED_FILES_SECRET_DIR)
76+
assert(configuredPod.container.getEnv.size === 1)
77+
val addedEnv = configuredPod.container.getEnv.get(0)
78+
assert(addedEnv.getName === ENV_MOUNTED_FILES_FROM_SECRET_DIR)
79+
assert(addedEnv.getValue === MOUNTED_FILES_SECRET_DIR)
80+
}
81+
82+
test("Maps submitted files in the system properties.") {
83+
val resolvedSystemProperties = stepUnderTest.getAdditionalPodSystemProperties()
84+
val expectedSystemProperties = Map(
85+
"spark.files" ->
86+
Seq(
87+
s"$MOUNTED_FILES_SECRET_DIR/${localFiles(0).getName}",
88+
s"$MOUNTED_FILES_SECRET_DIR/${localFiles(1).getName}",
89+
sparkFiles(2)).mkString(","),
90+
EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key -> "secret")
91+
assert(resolvedSystemProperties === expectedSystemProperties)
92+
}
93+
94+
test("Additional Kubernetes resources includes the mounted files secret.") {
95+
val secrets = stepUnderTest.getAdditionalKubernetesResources()
96+
assert(secrets.size === 1)
97+
assert(secrets(0).isInstanceOf[Secret])
98+
val secret = secrets(0).asInstanceOf[Secret]
99+
assert(secret.getMetadata.getName === "secret")
100+
val secretData = secret.getData.asScala
101+
assert(secretData.size === 2)
102+
assert(decodeToUtf8(secretData(localFiles(0).getName)) === "a")
103+
assert(decodeToUtf8(secretData(localFiles(1).getName)) === "b")
104+
}
105+
106+
private def decodeToUtf8(str: String): String = {
107+
new String(BaseEncoding.base64().decode(str), Charsets.UTF_8)
108+
}
109+
}

resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
6464
SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
6565
fi
6666

67+
if [ -n "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR" ]; then
68+
cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .
69+
fi
70+
6771
case "$1" in
6872
driver)
6973
shift 1

0 commit comments

Comments
 (0)