Skip to content

Commit 5499f6d

Browse files
committed
Submission client redesign to use a step-based builder pattern.
This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments.
1 parent 6acab03 commit 5499f6d

File tree

45 files changed

+1595
-2865
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1595
-2865
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,14 +621,22 @@ object SparkSubmit {
621621
if (isKubernetesCluster) {
622622
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
623623
if (args.isPython) {
624+
childArgs += "--py-file"
624625
childArgs += args.primaryResource
626+
childArgs += "--main-class"
625627
childArgs += "org.apache.spark.deploy.PythonRunner"
628+
childArgs += "--other-py-files"
626629
childArgs += args.pyFiles
627630
} else {
631+
childArgs += "--primary-java-resource"
628632
childArgs += args.primaryResource
633+
childArgs += "--main-class"
629634
childArgs += args.mainClass
630635
}
631-
childArgs ++= args.childArgs
636+
args.childArgs.foreach { arg =>
637+
childArgs += "--arg"
638+
childArgs += arg
639+
}
632640
}
633641

634642
// Load any properties specified through --conf and the default properties file

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/InitContainerResourceStagingServerSecretPlugin.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes
1818

19-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder, Secret}
2020

2121
import org.apache.spark.deploy.kubernetes.constants._
2222

@@ -27,13 +27,13 @@ private[spark] trait InitContainerResourceStagingServerSecretPlugin {
2727
* from a resource staging server.
2828
*/
2929
def mountResourceStagingServerSecretIntoInitContainer(
30-
initContainer: ContainerBuilder): ContainerBuilder
30+
initContainer: Container): Container
3131

3232
/**
3333
* Configure the pod to attach a Secret volume which hosts secret files allowing the
3434
* init-container to retrieve dependencies from the resource staging server.
3535
*/
36-
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
36+
def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod
3737
}
3838

3939
private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
@@ -42,21 +42,25 @@ private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
4242
extends InitContainerResourceStagingServerSecretPlugin {
4343

4444
override def mountResourceStagingServerSecretIntoInitContainer(
45-
initContainer: ContainerBuilder): ContainerBuilder = {
46-
initContainer.addNewVolumeMount()
47-
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
48-
.withMountPath(initContainerSecretMountPath)
49-
.endVolumeMount()
45+
initContainer: Container): Container = {
46+
new ContainerBuilder(initContainer)
47+
.addNewVolumeMount()
48+
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
49+
.withMountPath(initContainerSecretMountPath)
50+
.endVolumeMount()
51+
.build()
5052
}
5153

52-
override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
53-
basePod.editSpec()
54-
.addNewVolume()
55-
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
56-
.withNewSecret()
57-
.withSecretName(initContainerSecretName)
58-
.endSecret()
59-
.endVolume()
60-
.endSpec()
54+
override def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod = {
55+
new PodBuilder(basePod)
56+
.editSpec()
57+
.addNewVolume()
58+
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
59+
.withNewSecret()
60+
.withSecretName(initContainerSecretName)
61+
.endSecret()
62+
.endVolume()
63+
.endSpec()
64+
.build()
6165
}
6266
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{Container, Pod}
20+
21+
private[spark] case class PodWithDetachedInitContainer(
22+
pod: Pod,
23+
initContainer: Container,
24+
mainContainer: Container)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,25 @@ package org.apache.spark.deploy.kubernetes
1919
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}
2020

2121
import org.apache.spark.deploy.kubernetes.constants._
22-
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}
2322

23+
/**
24+
* This is separated out from the init-container steps API because this component can be reused to
25+
* set up the init-container for executors as well.
26+
*/
2427
private[spark] trait SparkPodInitContainerBootstrap {
2528
/**
2629
* Bootstraps an init-container that downloads dependencies to be used by a main container.
2730
* Note that this primarily assumes that the init-container's configuration is being provided
2831
* by a ConfigMap that was installed by some other component; that is, the implementation
2932
* here makes no assumptions about how the init-container is specifically configured. For
3033
* example, this class is unaware if the init-container is fetching remote dependencies or if
31-
* it is fetching dependencies from a resource staging server.
34+
* it is fetching dependencies from a resource staging server. Additionally, the container itself
35+
* is not actually attached to the pod, but the init container is returned so it can be attached
36+
* by InitContainerUtil after the caller has decided to make any changes to it.
3237
*/
3338
def bootstrapInitContainerAndVolumes(
34-
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
39+
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
40+
: PodWithDetachedInitContainer
3541
}
3642

3743
private[spark] class SparkPodInitContainerBootstrapImpl(
@@ -41,13 +47,12 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
4147
filesDownloadPath: String,
4248
downloadTimeoutMinutes: Long,
4349
initContainerConfigMapName: String,
44-
initContainerConfigMapKey: String,
45-
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
50+
initContainerConfigMapKey: String)
4651
extends SparkPodInitContainerBootstrap {
4752

4853
override def bootstrapInitContainerAndVolumes(
49-
mainContainerName: String,
50-
originalPodSpec: PodBuilder): PodBuilder = {
54+
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
55+
: PodWithDetachedInitContainer = {
5156
val sharedVolumeMounts = Seq[VolumeMount](
5257
new VolumeMountBuilder()
5358
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
@@ -58,7 +63,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
5863
.withMountPath(filesDownloadPath)
5964
.build())
6065

61-
val initContainer = new ContainerBuilder()
66+
val initContainer = new ContainerBuilder(originalPodWithUnattachedInitContainer.initContainer)
6267
.withName(s"spark-init")
6368
.withImage(initContainerImage)
6469
.withImagePullPolicy(dockerImagePullPolicy)
@@ -68,11 +73,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
6873
.endVolumeMount()
6974
.addToVolumeMounts(sharedVolumeMounts: _*)
7075
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
71-
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
72-
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
73-
}.getOrElse(initContainer).build()
74-
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
75-
originalPodSpec, resolvedInitContainer)
76+
.build()
77+
val podWithBasicVolumes = new PodBuilder(originalPodWithUnattachedInitContainer.pod)
7678
.editSpec()
7779
.addNewVolume()
7880
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
@@ -92,17 +94,20 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
9294
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
9395
.withEmptyDir(new EmptyDirVolumeSource())
9496
.endVolume()
95-
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
96-
.addToVolumeMounts(sharedVolumeMounts: _*)
97-
.addNewEnv()
98-
.withName(ENV_MOUNTED_FILES_DIR)
99-
.withValue(filesDownloadPath)
100-
.endEnv()
101-
.endContainer()
10297
.endSpec()
103-
resourceStagingServerSecretPlugin.map { plugin =>
104-
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
105-
}.getOrElse(podWithBasicVolumes)
98+
.build()
99+
val mainContainerWithMountedFiles = new ContainerBuilder(
100+
originalPodWithUnattachedInitContainer.mainContainer)
101+
.addToVolumeMounts(sharedVolumeMounts: _*)
102+
.addNewEnv()
103+
.withName(ENV_MOUNTED_FILES_DIR)
104+
.withValue(filesDownloadPath)
105+
.endEnv()
106+
.build()
107+
PodWithDetachedInitContainer(
108+
podWithBasicVolumes,
109+
initContainer,
110+
mainContainerWithMountedFiles)
106111
}
107112

108113
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,14 @@ package object config extends Logging {
418418
.stringConf
419419
.createOptional
420420

421+
private[spark] val INIT_CONTAINER_REMOTE_PYSPARK_FILES =
422+
ConfigBuilder("spark.kubernetes.initcontainer.remotePyFiles")
423+
.doc("Comma-separated list of Python file URIs to download in the init-container. This is" +
424+
" calculated given the list of python files sent to spark-submit.")
425+
.internal()
426+
.stringConf
427+
.createOptional
428+
421429
private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
422430
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
423431
.doc("Image for the driver and executor's init-container that downloads dependencies.")

0 commit comments

Comments
 (0)