Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Submission client redesign to use a step-based builder pattern #365

Merged
merged 20 commits into from
Jul 14, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9ff8c69
Submission client redesign to use a step-based builder pattern.
mccheah Jul 5, 2017
c23bb4c
Add a unit test for BaseSubmissionStep.
mccheah Jul 5, 2017
f8d28b8
Add unit test for kubernetes credentials mounting.
mccheah Jul 5, 2017
90f77fb
Add unit test for InitContainerBootstrapStep.
mccheah Jul 6, 2017
01b8d18
unit tests for initContainer
ifilonenko Jul 6, 2017
db1f0c2
Add a unit test for DependencyResolutionStep.
mccheah Jul 6, 2017
20d9a90
further modifications to InitContainer unit tests
ifilonenko Jul 6, 2017
1fb49a0
Merge branch 'submission-steps-refactor' of https://github.com/apache…
ifilonenko Jul 6, 2017
11c95e9
Use of resolver in PythonStep and unit tests for PythonStep
ifilonenko Jul 6, 2017
80a186d
refactoring of init unit tests and pythonstep resolver logic
ifilonenko Jul 6, 2017
1f58411
Add unit test for KubernetesSubmissionStepsOrchestrator.
mccheah Jul 6, 2017
31985a6
refactoring and addition of secret trustStore+Cert checks in a Submis…
ifilonenko Jul 6, 2017
9e002aa
added SparkPodInitContainerBootstrapSuite
ifilonenko Jul 7, 2017
61a7561
Added InitContainerResourceStagingServerSecretPluginSuite
ifilonenko Jul 7, 2017
fa78aad
style in Unit tests
ifilonenko Jul 7, 2017
c477a0c
extremely minor style fix in variable naming
ifilonenko Jul 7, 2017
5a76328
Address comments.
mccheah Jul 14, 2017
16adf71
Merge branch 'submission-steps-refactor' of github.com:apache-spark-o…
mccheah Jul 14, 2017
ed52eee
Rename class for consistency.
mccheah Jul 14, 2017
397312c
Attempt to make spacing consistent.
mccheah Jul 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,14 +621,17 @@ object SparkSubmit {
if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
if (args.isPython) {
childArgs += args.primaryResource
childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += args.pyFiles
childArgs ++= Array("--primary-py-file", args.primaryResource)
Copy link
Member

@ifilonenko ifilonenko Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) thanks for the --primary-py-file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it makes sense for (childArgs, childClasspath, sysProps, childMainClass) to be modeled as a case class with builder pattern, for similar reasons. Tangentially, mutable collections can be hazardous if not handled carefully - a case-class pattern using immutable collections might be worthwhile, given the complexity of the environment constructions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have many better options here because SparkSubmit creates the submission client implementation reflectively and only expects the submission client to have a main method with a list of arguments. This is to account for the fact that the core module of Spark doesn't have a compile time dependency on the specific submission client implementations for the different cluster managers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Client.scala we parse the arguments array into a case class and report on errors when fields are missing.

childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
childArgs ++= Array("--other-py-files", args.pyFiles)
} else {
childArgs += args.primaryResource
childArgs += args.mainClass
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
args.childArgs.foreach { arg =>
childArgs += "--arg"
childArgs += arg
}
childArgs ++= args.childArgs
}

// Load any properties specified through --conf and the default properties file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder, Secret}

import org.apache.spark.deploy.kubernetes.constants._

Expand All @@ -27,13 +27,13 @@ private[spark] trait InitContainerResourceStagingServerSecretPlugin {
* from a resource staging server.
*/
def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder
initContainer: Container): Container

/**
* Configure the pod to attach a Secret volume which hosts secret files allowing the
* init-container to retrieve dependencies from the resource staging server.
*/
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod
}

private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
Expand All @@ -42,21 +42,25 @@ private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
extends InitContainerResourceStagingServerSecretPlugin {

override def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder = {
initContainer.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
initContainer: Container): Container = {
new ContainerBuilder(initContainer)
.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
.build()
}

override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
basePod.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
override def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod = {
new PodBuilder(basePod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] case class PodWithDetachedInitContainer(
pod: Pod,
initContainer: Container,
mainContainer: Container)
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ package org.apache.spark.deploy.kubernetes
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}

/**
* This is separated out from the init-container steps API because this component can be reused to
* set up the init-container for executors as well.
*/
private[spark] trait SparkPodInitContainerBootstrap {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
* Note that this primarily assumes that the init-container's configuration is being provided
* by a ConfigMap that was installed by some other component; that is, the implementation
* here makes no assumptions about how the init-container is specifically configured. For
* example, this class is unaware if the init-container is fetching remote dependencies or if
* it is fetching dependencies from a resource staging server.
* it is fetching dependencies from a resource staging server. Additionally, the container itself
* is not actually attached to the pod, but the init container is returned so it can be attached
* by InitContainerUtil after the caller has decided to make any changes to it.
*/
def bootstrapInitContainerAndVolumes(
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
: PodWithDetachedInitContainer
}

private[spark] class SparkPodInitContainerBootstrapImpl(
Expand All @@ -41,13 +47,12 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String,
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
initContainerConfigMapKey: String)
extends SparkPodInitContainerBootstrap {

override def bootstrapInitContainerAndVolumes(
mainContainerName: String,
originalPodSpec: PodBuilder): PodBuilder = {
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
: PodWithDetachedInitContainer = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this syntax looks a bit weird -- should the : be on the prior line?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't fit on one line in under 100 characters.

val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
Expand All @@ -58,7 +63,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withMountPath(filesDownloadPath)
.build())

val initContainer = new ContainerBuilder()
val initContainer = new ContainerBuilder(originalPodWithUnattachedInitContainer.initContainer)
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy(dockerImagePullPolicy)
Expand All @@ -68,11 +73,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
}.getOrElse(initContainer).build()
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
originalPodSpec, resolvedInitContainer)
.build()
val podWithBasicVolumes = new PodBuilder(originalPodWithUnattachedInitContainer.pod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
Expand All @@ -92,17 +94,20 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
}.getOrElse(podWithBasicVolumes)
.build()
val mainContainerWithMountedFiles = new ContainerBuilder(
originalPodWithUnattachedInitContainer.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.build()
PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
mainContainerWithMountedFiles)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_REMOTE_PYSPARK_FILES =
ConfigBuilder("spark.kubernetes.initcontainer.remotePyFiles")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this newly-required from this refactor? I expected there to be change in user-visible behavior

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - I don't think this is necessary, this is an artifact of something I was trying before.

.doc("Comma-separated list of Python file URIs to download in the init-container. This is" +
" calculated given the list of python files sent to spark-submit.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
.doc("Image for the driver and executor's init-container that downloads dependencies.")
Expand Down
Loading