-
Notifications
You must be signed in to change notification settings - Fork 118
Submission client redesign to use a step-based builder pattern #365
Conversation
Rebuilt from #363. |
@@ -27,15 +27,19 @@ private[spark] class PythonStep( | |||
filesDownloadPath: String) extends KubernetesSubmissionStep { | |||
|
|||
override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { | |||
val resolvedOtherPyFilesString = if (otherPyFiles.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ifilonenko is this sufficient to cover the arguments handling? SPARK_DRIVER_ARGS is loaded in BaseSubmissionStep
already. Or is there more we need to do here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly, it would need to be null
unless you have docker parse, but docker doesnt do if: else: blocks so thats why this woudn't be helpful per say
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - but if we changed this to be the string "null" then that would suffice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have the Dockerfile use an if-else block also - we do this in a number of places in the existing ones. It's just bash if-else syntax in the command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am testing that rn in a seperate PR, but yes, it should. passing in just null
I believed cause withValue() to raise an error. But I think that CMD PythonRunner PY_FILE "null" DRIVER_ARGS parses correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if-else syntax doesnt exist? there is only if - then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bash supports if-then-else. It might be tricky to get it exactly right in the Dockerfile so maybe it's not worth it.
childArgs += "org.apache.spark.deploy.PythonRunner" | ||
childArgs += "--other-py-files" | ||
childArgs += args.pyFiles | ||
childArgs ++= Array("--primary-py-file", args.primaryResource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) thanks for the --primary-py-file
@@ -27,7 +27,7 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend { | |||
|
|||
override def initialize(): Unit = { | |||
Minikube.startMinikube() | |||
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() | |||
// new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentionally commited?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope good catch - was from local testing
Conflicts are likely from the style changes in the base PR. I expect that unless we have significant deviations functionality-wise in the Python implementation, we can resolve most if not all of the conflicts by just taking this branch. |
@Mock | ||
private var podWithDetachedInitContainer : PodWithDetachedInitContainer = _ | ||
@Mock | ||
private var initContainerSpec : InitContainerSpec = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use a mock here - since it's a case class, the equivalent of a struct, it's fine to just use the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you simulate the .copy method tho?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to - since it's a Scala primitive we should operate under what its actual behavior is. We don't mock classes like scala.collection.List
or java.util.Optional
for similar reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I will just pass in an empty case class
I resolved merge conflicts with the "ours" strategy since the only difference in the latest push that caused the conflicts were style changes. I'll rebase this branch to also be pointing to |
This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments.
99cccdc
to
9ff8c69
Compare
private val STAGING_SERVER_URI = "http://localhost:8000" | ||
|
||
test ("Contact resource staging server w/o TLS") { | ||
val SPARK_CONF = new SparkConf(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call this sparkConf
|
||
val initSteps : Seq[InitContainerStep] = initContainerStepsOrchestrator.getInitContainerSteps() | ||
assert(initSteps.length == 2) | ||
assert( initSteps.map({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just more cleanly expressed as
assert(initSteps(0).isInstanceOf[BaseInitContainerStep])
assert(initSteps(1).isInstanceOf(SubmittedResourcesInitContainerStep])
instead of using pattern matching.
INIT_CONTAINER_CONFIG_MAP_NAME, INIT_CONTAINER_CONFIG_MAP_KEY, SPARK_CONF) | ||
val initSteps : Seq[InitContainerStep] = initContainerStepsOrchestrator.getInitContainerSteps() | ||
assert(initSteps.length == 1) | ||
assert(initSteps.headOption.exists({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, just use isInstanceOf
.
@Mock | ||
private var podAndInitContainerBootstrap : SparkPodInitContainerBootstrap = _ | ||
@Mock | ||
private var podWithDetachedInitContainer : PodWithDetachedInitContainer = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a case class so it shouldn't be a mock.
podAndInitContainerBootstrap) | ||
val remoteJarsToDownload = KubernetesFileUtils.getOnlyRemoteFiles(SPARK_JARS) | ||
val remoteFilesToDownload = KubernetesFileUtils.getOnlyRemoteFiles(SPARK_FILES) | ||
assert(remoteJarsToDownload === List("hdfs://localhost:9000/app/jars/jar1.jar")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this check? We aren't testing the functionality of KubernetesFileUtils
here.
new Container(), new Container(), new Pod, Seq.empty[HasMetadata] | ||
) | ||
val returnContainerSpec = baseInitStep.prepareInitContainer(initContainerSpec) | ||
assert(expectedTest.toSet.subsetOf(returnContainerSpec.initContainerProperties.toSet)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for an exact match. We don't want to be setting unexpected properties here.
assert(remoteJarsToDownload === List("hdfs://localhost:9000/app/jars/jar1.jar")) | ||
assert(remoteFilesToDownload === List("hdfs://localhost:9000/app/files/file1.txt")) | ||
val expectedTest = Map( | ||
INIT_CONTAINER_JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a better name for this - expectedDriverSparkConf
, perhaps.
new Container(), new Container(), new Pod, Seq.empty[HasMetadata] | ||
) | ||
val returnContainerSpec = baseInitStep.prepareInitContainer(initContainerSpec) | ||
assert(expectedTest.toSet.subsetOf(returnContainerSpec.initContainerProperties.toSet)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should be inspecting the properties of the pod and the containers. The mock of the SparkPodInitContainerBootstrap
instance we pass in should modify the Kubernetes components in such a way that we can check for them afterwards. Basically we want to verify that the pod init container bootstrap was used to make changes to the pod and containers.
.set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI) | ||
|
||
val initContainerStepsOrchestrator = new InitContainerStepsOrchestrator( | ||
NAMESPACE, APP_RESOURCE_PREFIX, SPARK_JARS, SPARK_FILES, JARS_DOWNLOAD_PATH, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One argument per line.
private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key" | ||
private val STAGING_SERVER_URI = "http://localhost:8000" | ||
|
||
test ("Contact resource staging server w/o TLS") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably no need to mention TLS in the test description since it's largely irrelevant for what the test is actually checking.
driverContainer = new ContainerBuilder().build(), | ||
driverSparkConf = submissionSparkConf.clone(), | ||
otherKubernetesResources = Seq.empty[HasMetadata]) | ||
// This orchestrator determines which steps are necessary to take to resolve varying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't really the orchestrator - the orchestrator has pre-determined these steps to run. Perhaps this comment can be moved?
f124840
to
c23bb4c
Compare
childArgs += args.primaryResource | ||
childArgs += "org.apache.spark.deploy.PythonRunner" | ||
childArgs += args.pyFiles | ||
childArgs ++= Array("--primary-py-file", args.primaryResource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if it makes sense for (childArgs, childClasspath, sysProps, childMainClass)
to be modeled as a case class with builder pattern, for similar reasons. Tangentially, mutable collections can be hazardous if not handled carefully - a case-class pattern using immutable collections might be worthwhile, given the complexity of the environment constructions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have many better options here because SparkSubmit
creates the submission client implementation reflectively and only expects the submission client to have a main method with a list of arguments. This is to account for the fact that the core module of Spark doesn't have a compile time dependency on the specific submission client implementations for the different cluster managers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Client.scala
we parse the arguments array into a case class and report on errors when fields are missing.
rerun integration tests please |
INIT_CONTAINER_CONFIG_MAP_NAME, | ||
INIT_CONTAINER_CONFIG_MAP_KEY | ||
) | ||
private val returnedPodWithCont = sparkPodInit.bootstrapInitContainerAndVolumes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these into the test method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sharing fields is nice for the object under test, but for the functionality we specifically are testing, it's more idiomatic to put the method calls into the test.
initContainer = new Container(), | ||
mainContainer = new ContainerBuilder().withName(MAIN_CONTAINER_NAME).build()) | ||
) | ||
private val expectedSharedMap = Map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearer name, related to volumes
private val INIT_CONTAINER_SECRET_NAME = "init-secret" | ||
private val INIT_CONTAINER_SECRET_MOUNT = "/tmp/secret" | ||
|
||
private val initContainerRSSP = new InitContainerResourceStagingServerSecretPluginImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initContainerSecretPlugin
- no need for an acronym.
test("Volume Mount into InitContainer") { | ||
val returnedCont = initContainerRSSP.mountResourceStagingServerSecretIntoInitContainer( | ||
new ContainerBuilder().withName("init-container").build() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This closing bracket should be on the previous line. There's a few places where this is done, so please fix the others also.
assert(returnedCont.getVolumeMounts.asScala.map( | ||
vm => (vm.getName, vm.getMountPath)) === | ||
List((INIT_CONTAINER_SECRET_VOLUME_NAME, INIT_CONTAINER_SECRET_MOUNT)) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bracket on the previous line
DOWNLOAD_TIMEOUT_MINUTES, | ||
INIT_CONTAINER_CONFIG_MAP_NAME, | ||
INIT_CONTAINER_CONFIG_MAP_KEY | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bracket on the previous line
private val expectedSharedMap = Map( | ||
JARS_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME, | ||
FILES_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bracket on the previous line
import org.scalatest.BeforeAndAfter | ||
|
||
class SubmittedResourcesInitContainerStepSuite extends SparkFunSuite with BeforeAndAfter { | ||
private def createTempFile(extension: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this method to the bottom of the test class
.addToLabels("mountedSecret", "true") | ||
.endMetadata() | ||
.withNewSpec().endSpec() | ||
.build()}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing curly braces should be on their own lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow this is so much more readable than before! I think the concept of steps and the orchestrator assembling steps has worked out really well -- this is much easier to follow and understand than before.
Plus the tests seem much more straightforward than before great work!
Besides the little nits, I think adding a DriverSpecDeployer
or something similar would be the biggest improvement from here. It would further extract logic to a testable place, and is nice to understand as the followup to creating a spec: should be to deploy the spec.
Again, love this change!
mainContainerName: String, | ||
originalPodSpec: PodBuilder): PodBuilder = { | ||
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer) | ||
: PodWithDetachedInitContainer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this syntax looks a bit weird -- should the :
be on the prior line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't fit on one line in under 100 characters.
@@ -418,6 +418,14 @@ package object config extends Logging { | |||
.stringConf | |||
.createOptional | |||
|
|||
private[spark] val INIT_CONTAINER_REMOTE_PYSPARK_FILES = | |||
ConfigBuilder("spark.kubernetes.initcontainer.remotePyFiles") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this newly-required from this refactor? I expected there to be change in user-visible behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - I don't think this is necessary, this is an artifact of something I was trying before.
var mainClass: Option[String] = None | ||
val driverArgs = mutable.Buffer.empty[String] | ||
args.sliding(2).toList.collect { | ||
case Array("--primary-py-file", mainPyFile: String) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a new flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - the contract for arguments sent to the child submission client class have changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? I thought we wanted no user-visible changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a lot of these are to do the .sliding(2)
thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not user-visible because this class is proxied into from SparkSubmit.scala
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't expect this class to be used directly.
throw new RuntimeException(s"Unknown arguments: $other") | ||
} | ||
require(mainAppResource.isDefined, | ||
"Main app resource must be defined by either --py-file or --main-java-resource.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean --primary-py-file
and --primary-java-resource
here?
/** | ||
* Run command that initalizes a DriverSpec that will be updated | ||
* after each KubernetesSubmissionStep in the sequence that is passed in. | ||
* The final driver-spec will be used to build the Driver Container, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/driver-spec/DriverSpec/
-- this is the only place it's hyphenated
*/ | ||
def main(args: Array[String]): Unit = { | ||
val parsedArguments = ClientArguments.fromCommandLineArgs(args) | ||
val sparkConf = new SparkConf() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment that this reads from system properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and reorder to match order of run
method params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SparkConf constructor's Scaladoc states this sufficiently enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that it loads from System Properties I mean.
/** | ||
* Entry point from SparkSubmit in spark-core | ||
* | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra newline
|
||
/** | ||
* For the collection of uris, resolves any files as follows: | ||
* - Files with scheme file:// are resolved to the download path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the given download path
submissionSparkConf: SparkConf) { | ||
|
||
// The resource name prefix is derived from the application name, making it easy to connect the | ||
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lowercase kubectl
/** | ||
* Represents a step in preparing the Kubernetes driver. | ||
*/ | ||
private[spark] trait KubernetesSubmissionStep { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just DriverStep
? goes with the InitContainerStep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DriverStep
sounds pretty vague - we want to indicate that we're configuring the driver somehow. Maybe DriverConfigurationStep
and InitContainerConfigurationStep
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I like DriverConfigurationStep
and InitContainerConfigurationStep
…n-k8s/spark into submission-steps-refactor
Multi-line methods should have four-space indentation for arguments that aren't on the same line as the method call itself... but this is difficult to do consistently given how IDEs handle Scala multi-line indentation in most cases.
* Submission client redesign to use a step-based builder pattern. This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments. * Add a unit test for BaseSubmissionStep. * Add unit test for kubernetes credentials mounting. * Add unit test for InitContainerBootstrapStep. * unit tests for initContainer * Add a unit test for DependencyResolutionStep. * further modifications to InitContainer unit tests * Use of resolver in PythonStep and unit tests for PythonStep * refactoring of init unit tests and pythonstep resolver logic * Add unit test for KubernetesSubmissionStepsOrchestrator. * refactoring and addition of secret trustStore+Cert checks in a SubmissionStepSuite * added SparkPodInitContainerBootstrapSuite * Added InitContainerResourceStagingServerSecretPluginSuite * style in Unit tests * extremely minor style fix in variable naming * Address comments. * Rename class for consistency. * Attempt to make spacing consistent. Multi-line methods should have four-space indentation for arguments that aren't on the same line as the method call itself... but this is difficult to do consistently given how IDEs handle Scala multi-line indentation in most cases.
…e-spark-on-k8s#365) * Submission client redesign to use a step-based builder pattern. This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments. * Add a unit test for BaseSubmissionStep. * Add unit test for kubernetes credentials mounting. * Add unit test for InitContainerBootstrapStep. * unit tests for initContainer * Add a unit test for DependencyResolutionStep. * further modifications to InitContainer unit tests * Use of resolver in PythonStep and unit tests for PythonStep * refactoring of init unit tests and pythonstep resolver logic * Add unit test for KubernetesSubmissionStepsOrchestrator. * refactoring and addition of secret trustStore+Cert checks in a SubmissionStepSuite * added SparkPodInitContainerBootstrapSuite * Added InitContainerResourceStagingServerSecretPluginSuite * style in Unit tests * extremely minor style fix in variable naming * Address comments. * Rename class for consistency. * Attempt to make spacing consistent. Multi-line methods should have four-space indentation for arguments that aren't on the same line as the method call itself... but this is difficult to do consistently given how IDEs handle Scala multi-line indentation in most cases.
Applies changes assuming PySpark is present from #364 .
This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change.
The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it.
Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended.
This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments.
The current steps are:
aseSubmissionStep
: Baseline configurations such as the docker image and resource requests.DriverKubernetesCredentialsStep
: Resolves Kubernetes credentials configuration in the driver pod. Mounts a secret if necessary.InitContainerBootstrapStep
: Attaches the init-container, if necessary, to the driver pod. This is optional and wont' be loaded if all URIs are "local" or there are no URIs at all.DependencyResolutionStep
: Sets the classpath, spark.jars, and spark.files properties. This step is partially not isolated as it assumes that files that are remote or locally submitted will be downloaded to a given location. Unit tests should verify that this contract holds.PythonStep
: Configures Python environment variables if using PySpark.