Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Submission client redesign to use a step-based builder pattern (#365)
Browse files Browse the repository at this point in the history
* Submission client redesign to use a step-based builder pattern.

This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.

* Add a unit test for BaseSubmissionStep.

* Add unit test for kubernetes credentials mounting.

* Add unit test for InitContainerBootstrapStep.

* unit tests for initContainer

* Add a unit test for DependencyResolutionStep.

* further modifications to InitContainer unit tests

* Use of resolver in PythonStep and unit tests for PythonStep

* refactoring of init unit tests and pythonstep resolver logic

* Add unit test for KubernetesSubmissionStepsOrchestrator.

* refactoring and addition of secret trustStore+Cert checks in a SubmissionStepSuite

* added SparkPodInitContainerBootstrapSuite

* Added InitContainerResourceStagingServerSecretPluginSuite

* style in Unit tests

* extremely minor style fix in variable naming

* Address comments.

* Rename class for consistency.

* Attempt to make spacing consistent.

Multi-line methods should have four-space indentation for arguments that
aren't on the same line as the method call itself... but this is
difficult to do consistently given how IDEs handle Scala multi-line indentation
in most cases.
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 442490a commit fd30c5d
Show file tree
Hide file tree
Showing 56 changed files with 2,946 additions and 2,911 deletions.
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,14 +637,17 @@ object SparkSubmit extends CommandLineUtils {
if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
if (args.isPython) {
childArgs += args.primaryResource
childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += args.pyFiles
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
childArgs ++= Array("--other-py-files", args.pyFiles)
} else {
childArgs += args.primaryResource
childArgs += args.mainClass
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
args.childArgs.foreach { arg =>
childArgs += "--arg"
childArgs += arg
}
childArgs ++= args.childArgs
}

// Load any properties specified through --conf and the default properties file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder, Secret}

import org.apache.spark.deploy.kubernetes.constants._

Expand All @@ -27,13 +27,13 @@ private[spark] trait InitContainerResourceStagingServerSecretPlugin {
* from a resource staging server.
*/
def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder
initContainer: Container): Container

/**
* Configure the pod to attach a Secret volume which hosts secret files allowing the
* init-container to retrieve dependencies from the resource staging server.
*/
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod
}

private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
Expand All @@ -42,21 +42,25 @@ private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
extends InitContainerResourceStagingServerSecretPlugin {

override def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder = {
initContainer.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
initContainer: Container): Container = {
new ContainerBuilder(initContainer)
.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
.build()
}

override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
basePod.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
override def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod = {
new PodBuilder(basePod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] case class PodWithDetachedInitContainer(
pod: Pod,
initContainer: Container,
mainContainer: Container)
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ package org.apache.spark.deploy.kubernetes
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}

/**
* This is separated out from the init-container steps API because this component can be reused to
* set up the init-container for executors as well.
*/
private[spark] trait SparkPodInitContainerBootstrap {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
* Note that this primarily assumes that the init-container's configuration is being provided
* by a ConfigMap that was installed by some other component; that is, the implementation
* here makes no assumptions about how the init-container is specifically configured. For
* example, this class is unaware if the init-container is fetching remote dependencies or if
* it is fetching dependencies from a resource staging server.
* it is fetching dependencies from a resource staging server. Additionally, the container itself
* is not actually attached to the pod, but the init container is returned so it can be attached
* by InitContainerUtil after the caller has decided to make any changes to it.
*/
def bootstrapInitContainerAndVolumes(
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
: PodWithDetachedInitContainer
}

private[spark] class SparkPodInitContainerBootstrapImpl(
Expand All @@ -41,13 +47,11 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String,
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
initContainerConfigMapKey: String)
extends SparkPodInitContainerBootstrap {

override def bootstrapInitContainerAndVolumes(
mainContainerName: String,
originalPodSpec: PodBuilder): PodBuilder = {
podWithDetachedInitContainer: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
Expand All @@ -58,7 +62,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withMountPath(filesDownloadPath)
.build())

val initContainer = new ContainerBuilder()
val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer)
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy(dockerImagePullPolicy)
Expand All @@ -68,11 +72,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
}.getOrElse(initContainer).build()
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
originalPodSpec, resolvedInitContainer)
.build()
val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
Expand All @@ -92,17 +93,20 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
}.getOrElse(podWithBasicVolumes)
.build()
val mainContainerWithMountedFiles = new ContainerBuilder(
podWithDetachedInitContainer.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.build()
PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
mainContainerWithMountedFiles)
}

}
Loading

0 comments on commit fd30c5d

Please sign in to comment.