Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 141 additions & 53 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ building using the supplied script, or manually.

To launch Spark Pi in cluster mode,

{% highlight bash %}
```bash
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.driver.docker.image=<driver-image> \
--conf spark.kubernetes.executor.docker.image=<executor-image> \
--conf spark.kubernetes.driver.container.image=<driver-image> \
--conf spark.kubernetes.executor.container.image=<executor-image> \
local:///path/to/examples.jar
{% endhighlight %}
```

The Spark master, specified either via passing the `--master` command line argument to `spark-submit` or by setting
`spark.master` in the application's configuration, must be a URL with the format `k8s://<api_server_url>`. Prefixing the
Expand Down Expand Up @@ -120,6 +120,54 @@ by their appropriate remote URIs. Also, application dependencies can be pre-moun
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles.

### Using Remote Dependencies
When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
need a Kubernetes [init-container](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) for downloading
the dependencies so the driver and executor containers can use them locally. This requires users to specify the container
image for the init-container using the configuration property `spark.kubernetes.initContainer.image`. For example, users
simply add the following option to the `spark-submit` command to specify the init-container image:

```
--conf spark.kubernetes.initContainer.image=<init-container image>
```

The init-container handles remote dependencies specified in `spark.jars` (or the `--jars` option of `spark-submit`) and
`spark.files` (or the `--files` option of `spark-submit`). It also handles remotely hosted main application resources, e.g.,
the main application jar. The following shows an example of using remote dependencies with the `spark-submit` command:

```bash
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \
--conf spark.kubernetes.driver.container.image=<driver-image> \
--conf spark.kubernetes.executor.container.image=<executor-image> \
--conf spark.kubernetes.initContainer.image=<init-container image>
https://path/to/examples.jar
```

## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
Spark application to access secured services. To mount a user-specified secret into the driver container, users can use
the configuration property of the form `spark.kubernetes.driver.secrets.[SecretName]=<mount path>`. Similarly, the
configuration property of the form `spark.kubernetes.executor.secrets.[SecretName]=<mount path>` can be used to mount a
user-specified secret into the executor containers. Note that it is assumed that the secret to be mounted is in the same
namespace as that of the driver and executor pods. For example, to mount a secret named `spark-secret` onto the path
`/etc/secrets` in both the driver and executor containers, add the following options to the `spark-submit` command:

```
--conf spark.kubernetes.driver.secrets.spark-secret=/etc/secrets
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```

Note that if an init-container is used, any secret mounted into the driver container will also be mounted into the
init-container of the driver. Similarly, any secret mounted into an executor container will also be mounted into the
init-container of the executor.

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down Expand Up @@ -275,7 +323,7 @@ specific to Spark on Kubernetes.
<td><code>(none)</code></td>
<td>
Container image to use for the driver.
This is usually of the form `example.com/repo/spark-driver:v1.0.0`.
This is usually of the form <code>example.com/repo/spark-driver:v1.0.0</code>.
This configuration is required and must be provided by the user.
</td>
</tr>
Expand All @@ -284,7 +332,7 @@ specific to Spark on Kubernetes.
<td><code>(none)</code></td>
<td>
Container image to use for the executors.
This is usually of the form `example.com/repo/spark-executor:v1.0.0`.
This is usually of the form <code>example.com/repo/spark-executor:v1.0.0</code>.
This configuration is required and must be provided by the user.
</td>
</tr>
Expand Down Expand Up @@ -528,51 +576,91 @@ specific to Spark on Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.jarsDownloadDir</code></td>
<td><code>/var/spark-data/spark-jars</code></td>
<td>
Location to download jars to in the driver and executors.
This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.filesDownloadDir</code></td>
<td><code>/var/spark-data/spark-files</code></td>
<td>
Location to download jars to in the driver and executors.
This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
</td>
</tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.jarsDownloadDir</code></td>
<td><code>/var/spark-data/spark-jars</code></td>
<td>
Location to download jars to in the driver and executors.
This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.filesDownloadDir</code></td>
<td><code>/var/spark-data/spark-files</code></td>
<td>
Location to download jars to in the driver and executors.
This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.timeout</code></td>
<td>300s</td>
<td>
Timeout in seconds before aborting the attempt to download and unpack dependencies from remote locations into
the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.maxSimultaneousDownloads</code></td>
<td>5</td>
<td>
Maximum number of remote dependencies to download simultaneously in a driver or executor pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.initContainer.image</code></td>
<td>(none)</td>
<td>
Container image for the <a href="https://kubernetes.io/docs/concepts/workloads/pods/init-containers/">init-container</a> of the driver and executors for downloading dependencies. This is usually of the form <code>example.com/repo/spark-init:v1.0.0</code>. This configuration is optional and must be provided by the user if any non-container local dependency is used and must be downloaded remotely.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the driver pod on the path specified in the value. For example,
<code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
the secret will also be added to the init-container in the driver pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the executor pod on the path specified in the value. For example,
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
the secret will also be added to the init-container in the executor pod.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ final class OneVsRestModel private[ml] (
val newDataset = dataset.withColumn(accColName, initUDF())

// persist if underlying dataset is not persistent.
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) {
newDataset.persist(StorageLevel.MEMORY_AND_DISK)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

private[spark] object Config extends Logging {

Expand Down Expand Up @@ -132,30 +131,84 @@ private[spark] object Config extends Logging {

val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.doc("Location to download jars to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")

val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.doc("Location to download files to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")

val INIT_CONTAINER_IMAGE =
ConfigBuilder("spark.kubernetes.initContainer.image")
.doc("Image for the driver and executor's init-container for downloading dependencies.")
.stringConf
.createOptional

val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
.doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
"locations into the driver and executor pods.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300)

val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
.doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
"executor pod.")
.intConf
.createWithDefault(5)

val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is " +
"calculated from spark.jars.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is " +
"calculated from spark.files.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_NAME =
ConfigBuilder("spark.kubernetes.initContainer.configMapName")
.doc("Name of the config map to use in the init-container that retrieves submitted files " +
"for the executor.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
.doc("Key for the entry in the init container config map for submitted files that " +
"corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ private[spark] object Constants {
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"

// Bootstrapping dependencies with the init-container
val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Loading