Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Use the driver pod IP address for spark.driver.bindAddress #533

Merged
merged 4 commits into from
Oct 26, 2017
Merged

Conversation

liyinan926
Copy link
Member

@liyinan926 liyinan926 commented Oct 20, 2017

What changes were proposed in this pull request?

This PR attempts to fix the issue reported in #523 that may happen if the driver tries to bind to the driver host name before the endpoint controller modifies the DNS configuration.

Changes:
the submission client stops setting spark.driver.bindAddress based on the name of the headless service for the driver in DriverAddressConfigurationStep that's renamed to DriverServiceBootstrapStep in this PR. Instead, this PR introduces a new environment variable SPARK_DRIVER_BIND_ADDRESS that get its value from status.podIP using the downward API. So at runtime SPARK_DRIVER_BIND_ADDRESS's value is the driver pod's IP address. Then we can do -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS in the Dockerfile to give spark.driver.bindAddress the right value. The submission client still sets spark.driver.host to the driver service DNS name, though.

Tested on a GKE cluster using the SparkPi example. Verified that the following showed up in the driver container:

SPARK_DRIVER_BIND_ADDRESS=10.44.2.26

And the driver pod YAML contained the following:

- name: SPARK_DRIVER_BIND_ADDRESS
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: status.podIP
- name: SPARK_JAVA_OPT_9
      value: -Dspark.driver.host=spark-pi-1508880029167-driver-svc.default.svc.cluster.local

@foxish @mccheah @kimoonkim

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good to me. Thanks!

Maybe @mccheah or @ifilonenko can take a look as well?

@liyinan926
Copy link
Member Author

rerun integration tests please

@@ -147,7 +128,7 @@ private[spark] class DriverAddressConfigurationStepSuite
} catch {
case e: Throwable =>
assert(e.getMessage ===
s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_HOST_KEY} is" +
s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_HOST_KEY} is" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be indented 2 spaces not 4 (same for the ones above).
http://spark.apache.org/contributing.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -108,7 +91,7 @@ private[spark] class DriverAddressConfigurationStepSuite
}

test("Long prefixes should switch to using a generated name.") {
val configurationStep = new DriverAddressConfigurationStep(
val configurationStep = new DriverServiceBootstrapStep(
LONG_RESOURCE_NAME_PREFIX,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be indented 2 spaces not 4 (same for the ones above).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mccheah
Copy link

mccheah commented Oct 23, 2017

Has this been tested in production clusters?

@mccheah
Copy link

mccheah commented Oct 23, 2017

Particularly with SPARK-21642 also merged.

@mccheah
Copy link

mccheah commented Oct 23, 2017

Actually I wanted to circle back to this and what we concluded from #523.

It sounds like from the comments in the issue, the problem is a race condition when the Endpoints object for the driver's service is not created when the binding occurs.

I wonder if we can just fix the race condition itself. Can the scheduler backend's initialization have a Watch that blocks until the endpoint object is ready before attempting to bind to the given hostname? That seems more idiomatic and fixes the actual underlying problem, rather than working with IP address magic. But there's pros and cons either way.

If we use the IP address, is kube-dns a requirement for this code path? I think we still rely on kube-dns to resolve the K8s master hostname. But relying less on kube-dns seems beneficial anyways.

@kimoonkim
Copy link
Member

@mccheah Excellent questions.

Can the scheduler backend's initialization have a Watch that blocks until the endpoint object is ready before attempting to bind to the given hostname?

This might also work. However, it could be difficult to block the binding until the endpoint is ready. Depends on which Spark core classes are in charge of binding the ports. (I think there are actually two, the scheduler backend port and block manager port) Personally, I do not know which core classes handle the binding.

If we use the IP address, is kube-dns a requirement for this code path?

The IP address approach allows the driver to avoid kube-dns dependency. But executors still use the service name to connect to the driver. So I think we still need kube-dns for this feature to work. Note it is still possible for executors to come up before the service endpoint is created. But this race condition is very very unlikely.

I think we still rely on kube-dns to resolve the K8s master hostname

For the k8s master hostname, we are relying on the underlying bare-metal DNS, not kube-dns. kube-dns is only in charge of service to pod mappings.

@mccheah
Copy link

mccheah commented Oct 24, 2017

it is still possible for executors to come up before the service endpoint is created. But this race condition is very very unlikely.

If we wanted, we could block this by having polling for the Endpoints resource's readiness in the scheduler backend - we know for sure that the scheduler needs to initialize before it requests for any executors, unlike with the general case where we don't know when the driver will attempt to bind to an address.

Having it bind to IP address instead seems fine, but I think we want to make sure this is tested with SPARK-21642 merged. If someone can custom build and test Spark with the appropriate patches then that would be fantastic. Something like what @ash211 did with #484.

@liyinan926
Copy link
Member Author

I'm testing a custom build with changes from SPARK-21642. Will report back later.

@liyinan926
Copy link
Member Author

If we wanted, we could block this by having polling for the Endpoints resource's readiness in the scheduler backend - we know for sure that the scheduler needs to initialize before it requests for any executors, unlike with the general case where we don't know when the driver will attempt to bind to an address.

I agree the scheduler backend should block and wait for the Endpoints resource to become ready before launching executor pods.

@mccheah
Copy link

mccheah commented Oct 24, 2017

That would require a minor and very subtle API break. We allow the driver to be provided specific credentials so that the driver can have only the minimum privilege to create/delete pods. Now, the driver will also need privileges to read the status of endpoints in its namespace. I don't see this as a significant concern but I am noting it here as a reference.

@liyinan926
Copy link
Member Author

liyinan926 commented Oct 24, 2017

There seems to be an issue with changes from this PR and changes from SPARK-21642. I got the following exception when running the SparkPi example:

Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:32)
	at org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:300)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:314)
	at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	... 4 more
Caused by: java.io.IOException: Failed to connect to spark-pi-1508813143731-driver:7078
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)

The problem is when the executor started up, CoarseGrainedExecutorBackend tried to create a SparkEnv and called RpcUtils.makeDriverRef somewhere. makeDriverRef constructs the driver endpoint using RpcAddress(driverHost, driverPort), where the driverHost gets its value from spark.driver.host. With the changes from SPARK-21642, spark.driver.host by default resolves to the hostname of the driver pod, which is by default the name of the pod, i.e., spark-pi-1508813143731-driver. That's why it failed with Failed to connect to spark-pi-1508813143731-driver:7078.

The hostname of the driver pod can be customized using the hostname field of the pod spec. We can definitely set it to the IP address of the driver pod in the scheduler backend, and by doing this spark.driver.host takes the driver pod IP address and the issue will be solved. But I'm not sure if this is defeating the purpose of SPARK-21642.

@liyinan926
Copy link
Member Author

liyinan926 commented Oct 24, 2017

Actually to solve the issue above, we just need to pass spark.driver.host that is set to the driver service DNS name to the executors. I will give it a try and report back.

@liyinan926
Copy link
Member Author

liyinan926 commented Oct 24, 2017

After failing to make the approach above work, this landed onto a totally different approach that finally works. Tested the new solution on a GKE cluster with changes from SPARK-21642 merged. Please see the updated PR description.

@mccheah
Copy link

mccheah commented Oct 24, 2017

I believe the main point of SPARK-21642 was to tie the driver to a hostname instead of an IP address. It seems like moving to set the host to an IP address is a regression in this behavior. Should we still be trying to make this work with the service hostname?

@liyinan926
Copy link
Member Author

liyinan926 commented Oct 24, 2017

We are still setting spark.driver.host to the driver service DNS name so the executors will still try to talk to the driver through the service. The only change is to set spark.driver.bindAddress to the actual pod IP address so it doesn't need a DNS resolution when the driver binds to that address.

@kimoonkim
Copy link
Member

kimoonkim commented Oct 24, 2017

I believe the main point of SPARK-21642 was to tie the driver to a hostname instead of an IP address.

I was curious why it does that. It is helpful to consider the behavior in two pieces:

  1. Executors connect to the driver using a hostname instead of an IP address.
  2. The driver binds to the service ports using a hostname instead of an IP address.

Is only (1) the point of SPARK-21642? Or (2) is also important for SPARK-21642?

@liyinan926
Copy link
Member Author

@kimoonkim Very good points. IMO (1) is what SPARK-21642 really is all about. (2) is a side effect of it simply because spark.driver.bindAddress falls back to spark.driver.host if not set.

s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_BIND_ADDRESS_KEY} is" +
s" not supported in Kubernetes mode, as the driver's hostname will be managed via" +
s" a Kubernetes service.")
s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_BIND_ADDRESS_KEY} is" +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message should probably say "the driver's bind address will be managed...". It's also not by a Kubernetes service anymore, but by the pod's IP address.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -75,7 +75,7 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs.size === 6)
assert(envs.size === 7)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the value of the new env we put here.

Copy link
Member Author

@liyinan926 liyinan926 Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot check the value as it will be null unless there's a way to set status.podIP. But we can definitely check the existence of the new env key. Added a check.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be able to check that there is a valueFrom field?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because we're setting the envs in a map but the valueFrom isn't captured in that map. Looks like we'll need to break out of the Map[envKey -> envValue] paradigm to check this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mccheah
Copy link

mccheah commented Oct 25, 2017

Looks ok apart from the minor comments.

val envDriverBindAddress = preparedDriverSpec.driverContainer
.getEnv
.asScala
.filter(envVar => envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS))
Copy link

@mccheah mccheah Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -> can compact this all into an exists call. E.g.

val hasBindAddressWithPodIP = preparedDriverSpec.driverContainer.getEnv.asScala.exists { envVar -> envVar.getName == ... && envVar.getValueFrom.... }
assert(hasBindAddressWithPodIP, <message>)

etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge when the build passes; feel free to merge if I miss it otherwise.

@liyinan926 liyinan926 merged commit 6b1caca into apache-spark-on-k8s:branch-2.2-kubernetes Oct 26, 2017
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…ark-on-k8s#533)

* Use the driver pod IP address for spark.driver.bindAddress

* Addressed comments

* Addressed more comments

* Fixed broken DriverServiceBootstrapStepSuite
ifilonenko pushed a commit to bloomberg/apache-spark-on-k8s that referenced this pull request Jun 7, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants