Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Use the driver pod IP address for spark.driver.bindAddress #533

Merged
merged 4 commits into from
Oct 26, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ package object constants {
// Environment Variables
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
private[spark] val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -103,7 +103,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
mainClass,
appArgs,
submissionSparkConf)
val driverAddressStep = new DriverAddressConfigurationStep(
val driverAddressStep = new DriverServiceBootstrapStep(
kubernetesResourceNamePrefix,
allDriverLabels,
submissionSparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s.submit.submitsteps

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -114,6 +114,12 @@ private[spark] class BaseDriverConfigurationStep(
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.mkString(" "))
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ import org.apache.spark.util.Clock
* Allows the driver to be reachable by executor pods through a headless service. The service's
* ports should correspond to the ports that the executor will reach the pod at for RPC.
*/
private[spark] class DriverAddressConfigurationStep(
private[spark] class DriverServiceBootstrapStep(
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
submissionSparkConf: SparkConf,
clock: Clock) extends DriverConfigurationStep with Logging {
import DriverAddressConfigurationStep._
import DriverServiceBootstrapStep._

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" +
s" will be managed via a Kubernetes service.")
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" +
s" will be managed via a Kubernetes service.")
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
s" managed via a Kubernetes service.")
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
s" managed via a Kubernetes service.")

val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
Expand All @@ -51,8 +51,8 @@ private[spark] class DriverAddressConfigurationStep(
val randomServiceId = clock.getTimeMillis()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" +
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
s" as the driver service's name.")
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
s" as the driver service's name.")
shorterServiceName
}

Expand Down Expand Up @@ -82,19 +82,18 @@ private[spark] class DriverAddressConfigurationStep(
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname)
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
.set("spark.driver.port", driverPort.toString)
.set(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)

driverSpec.copy(
driverSparkConf = resolvedSparkConf,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
driverSparkConf = resolvedSparkConf,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
}
}

private[spark] object DriverAddressConfigurationStep {
private[spark] object DriverServiceBootstrapStep {
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
val DRIVER_SVC_POSTFIX = "-driver-svc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}

private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {

Expand Down Expand Up @@ -50,7 +50,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep])
Expand All @@ -74,7 +74,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -97,7 +97,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -120,7 +120,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -144,7 +144,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -169,7 +169,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,33 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])

val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)

assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY)

assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
val envs = preparedDriverSpec.driverContainer
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs.size === 6)
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "256M")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")

val envDriverBindAddress = preparedDriverSpec.driverContainer
.getEnv
.asScala
.filter(envVar => envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS))
Copy link

@mccheah mccheah Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -> can compact this all into an exists call. E.g.

val hasBindAddressWithPodIP = preparedDriverSpec.driverContainer.getEnv.asScala.exists { envVar -> envVar.getName == ... && envVar.getValueFrom.... }
assert(hasBindAddressWithPodIP, <message>)

etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.head
assert(envDriverBindAddress.getValueFrom.getFieldRef.getApiVersion.equals("v1"))
assert(envDriverBindAddress.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP"))

val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
Expand Down
Loading