Skip to content

Conversation

@falaki
Copy link
Contributor

@falaki falaki commented Oct 13, 2016

What changes were proposed in this pull request?

This patch makes RBackend connection timeout configurable by user.

How was this patch tested?

N/A

@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66915 has finished for PR 15471 at commit 9dde457.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66928 has finished for PR 15471 at commit d27233c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

would it be possible to pull "6000" into a constant so it could be found/changed in one shot?

@falaki falaki changed the title [SPARK-17919] Make timeout to RBackend configurable in SparkR [WIP][SPARK-17919] Make timeout to RBackend configurable in SparkR Oct 14, 2016
@falaki
Copy link
Contributor Author

falaki commented Oct 14, 2016

@felixcheung that is a good suggestion. I will try to use a single constant.
I changed the label to WIP because something is still timing out the connection in my tests. Maybe very long timeouts are not properly implemented in R? If you noticed I am setting timeout on all the connections that are every opened.

@shivaram I am going to experiment with a helper thread inside the main netty thread that sends heartbeats (-1 as result) and then invokeJava() would try reading again if it sees the heartbeat value. What do you think? I noticed we establish and keep a monitor connection as monitorConn but don't use it anywhere!? Am I right?

@shivaram
Copy link
Contributor

@falaki Thats an interesting approach to try -- the other thing to try might be to use a separate connection to send back results. i.e. as soon as JVM gets the request it returns OK or some such status and then whenever the result is ready the JVM pushes data on the reader socket that is created in R. We might need to try a couple of options here to see which looks best.

BTW the monitorConn is used - its used in cases where the JVM comes up first, say in YARN / spark-submit and the JVM uses that to detect if the R process has crashed.

@falaki
Copy link
Contributor Author

falaki commented Oct 14, 2016

@shivaram thanks for clarification.
I realized we were not setting socket timeout on Netty socket. So I added that as well.
I also introduced the heartbeat mechanism and tested it locally. Next I am going to test if this works on real workload.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66997 has finished for PR 15471 at commit 630467e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #67001 has finished for PR 15471 at commit 3744623.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #67005 has finished for PR 15471 at commit 6f15a15.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@falaki falaki changed the title [WIP][SPARK-17919] Make timeout to RBackend configurable in SparkR [SPARK-17919] Make timeout to RBackend configurable in SparkR Oct 17, 2016
@falaki
Copy link
Contributor Author

falaki commented Oct 17, 2016

@shivaram this worked on my stress tests. The question is how to unit test this?

@yhuai
Copy link
Contributor

yhuai commented Oct 17, 2016

test this please

@shivaram
Copy link
Contributor

@falaki Do we know if the tests timeout are due to this change ? Or are they unrelated ?

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67079 has finished for PR 15471 at commit 6f15a15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@falaki
Copy link
Contributor Author

falaki commented Oct 17, 2016

@shivaram I think they are unrelated. Can you trigger another test?

@shivaram
Copy link
Contributor

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67091 has finished for PR 15471 at commit 6f15a15.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

@falaki looks like the SparkR MLlib unit tests are timing out on Jenkins. Do they pass on your machine ?

@falaki
Copy link
Contributor Author

falaki commented Oct 19, 2016

@shivaram it was indeed my fault. I did not run local tests after I added the heartbeat. I am now using +1 for heartbeat.

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67153 has finished for PR 15471 at commit fcc3376.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good aside from minor comments.
This goes to 2.1.0?

}
val conf = new SparkConf()
val heartBeatInterval = conf.getInt(
"spark.r.heartBeatInterval", SparkRDefaults.DEFAULT_HEARTBEAT_INTERVAL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be documented too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
} else {
// To avoid timeouts when reading results in SparkR driver, we will be regularly sending
// heartbeat responses. We use special character -1 to signal the client that backend is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 -> +1?

returnStatus <- readInt(conn)
handleErrors(returnStatus, conn)

# Backend will send -1 as keep alive value to prevent various connection timeouts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 -> +1?

cause match {
case timeout: ReadTimeoutException =>
// Do nothing. We don't want to timeout on read
logInfo("Ignoring read timeout in RBackendHandler")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logWarning?

@falaki
Copy link
Contributor Author

falaki commented Oct 20, 2016

Thanks @felixcheung addressed your comments.

@felixcheung
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67287 has finished for PR 15471 at commit 749c8b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

@falaki @felixcheung Since this is a big change I'd like to also take a look at this once - Will try to get to it tonight.

@felixcheung
Copy link
Member

sure. I think the target is 2.0.2 - it will be good to review this more closely.

@falaki
Copy link
Contributor Author

falaki commented Oct 20, 2016

Thanks @shivaram. I ran a real workload consisting of long running parallel simulations that took about 3.5 hours. I also tested it by calling Sys.sleep() inside workers with dapply and spark.lapply. Would be good to torture it in new ways.

Also would be interesting to think about ways of unit-testing this.

Copy link
Contributor

@shivaram shivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @falaki -- This is a very useful change. I just did a pass of comments.

# Worker daemon

rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
connectionTimeout <- Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should take a default value here and in worker.R as well - 6000 is fine to use as default everywhere

bootElap <- elapsedSecs()

rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
connectionTimeout <- Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value here as well

// Connection timeout is set by socket client. To make it configurable we will pass the
// timeout value to client inside the temp file
val conf = new SparkConf()
val backendConnectionTimeout = conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure SparkConf has been initialized successfully at this point ? Or to to put it another way, in which cases does this code path get called from ? Is this in the spark-submit case or the shell etc. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for spark-submit. Basically the JVM starts before the R process. As a result the only way for R process to get these configuration parameters from the JVM. In this case, RBackend sets the environment variables based on configs.

For the other mode where JVM is started after the R process, we are sending this timeout value through the TCP connection.

At least that is my current understanding of how deploy modes work. In our production environment we launch the R process from the JVM.

var rCommand = sparkConf.get("spark.sparkr.r.command", "Rscript")
rCommand = sparkConf.get("spark.r.command", rCommand)

val rConnectionTimeout = SparkEnv.get.conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use sparkConf.get similar to the line above ?

// To avoid timeouts when reading results in SparkR driver, we will be regularly sending
// heartbeat responses. We use special code +1 to signal the client that backend is
// alive and it should continue blocking for result.
val execService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("SparkRKeepAliveThread")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how expensive it is to create and destroy an executor service each time. Can we just schedule at fixed rate when we get the request and then cancel the schedule at the end of the request ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure about this either. I used this method based on advice from @zsxwing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - my question on whether we can reuse this still stands. @zsxwing do you think thats possible ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivaram we can reuse it. scheduleAtFixedRate returns ScheduledFuture which can be used to cancel the task. However, there is no awaitTermination for ScheduledFuture after cancelling a future. We need some extra work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a closer look at this and (a) it looks like the executor just calls cancel on the tasks during shutdown, so that part of the behavior is the same as calling cancel on the task we have [1]. But you are right that if we wanted to wait for termination we'd need to do some extra work. We could use the get call but its unclear what the semantics of that are. It might be more easier to just setup a semaphore or mutex that is shared by the runnable and the outside thread.

But overall it looks like thread pool creation is only around 100 micro-seconds [2] and I also benchmarked this locally.

[1] http://www.docjar.com/html/api/java/util/concurrent/ScheduledThreadPoolExecutor.java.html line 367
[2] http://stackoverflow.com/a/5483467/4577954

@falaki
Copy link
Contributor Author

falaki commented Oct 26, 2016

@shivaram sorry for delay getting back to this. Please take another look.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67602 has finished for PR 15471 at commit 666b609.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 27, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67661 has finished for PR 15471 at commit 666b609.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@falaki
Copy link
Contributor Author

falaki commented Oct 27, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67667 has finished for PR 15471 at commit 666b609.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@falaki
Copy link
Contributor Author

falaki commented Oct 29, 2016

@shivaram is there a chance this makes it to the 2.0.2 release?

@shivaram
Copy link
Contributor

Taking another look now

@shivaram
Copy link
Contributor

@falaki The code change looks pretty good to me, but I'm still a bit worried about introducing a big change in a minor release. Can we have this disabled by default and flip the flag only in the master branch ?

@HyukjinKwon Is there any way to retrigger the AppVeyor build ?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 29, 2016

Build started: [SparkR] ALL PR-15471
Diff: master...spark-test:1175779D-A053-45AF-BC6C-EA34931CFC37

@shivaram I first thought committers can access to the Apache's AppVeyor account but it seems not. We should go to the Web UI and click the rebuild button.. So, I just made (locally) a bunch of scripts to launch a build via @spark-test account in such cases. Please cc me (treat me like a bot) then I will leave such comments above until we find a good way to do it.

@falaki
Copy link
Contributor Author

falaki commented Oct 29, 2016

@shivaram that is fine. We can merge it to 2.1 (or whatever the next major release is going to be).

@shivaram
Copy link
Contributor

Thanks @HyukjinKwon - The appveyor tests seem to pass as well

Change LGTM to merge to master. @felixcheung Any other comments ?

@felixcheung
Copy link
Member

LGTM.

@felixcheung
Copy link
Member

merged to master.

@asfgit asfgit closed this in 2881a2d Oct 30, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
## What changes were proposed in this pull request?

This patch makes RBackend connection timeout configurable by user.

## How was this patch tested?
N/A

Author: Hossein <[email protected]>

Closes apache#15471 from falaki/SPARK-17919.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This patch makes RBackend connection timeout configurable by user.

## How was this patch tested?
N/A

Author: Hossein <[email protected]>

Closes apache#15471 from falaki/SPARK-17919.

# Backend will send -1 as keep alive value to prevent various connection timeouts
# on very long running jobs. See spark.r.heartBeatInterval
while (returnStatus == 1) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudn't it have a retry limit for the returnStatus check to avoid infinite loop?

I have an infinite loop when the it is called by Toree sparkr_runner.R with error message "Failed to connect JVM: Error in socketConnection(host = hostname, port = port, server = FALSE, : argument "timeout" is missing, with no default"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@falaki @felixcheung any thoughts on this ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think it's a good idea to avoid infinite loop in general.
how is toree calling this?
could you open a JIRA?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants