Skip to content

Conversation

@jey
Copy link
Contributor

@jey jey commented Apr 11, 2013

This is an initial patch to reduce the startup time of PySpark workers by preforking the workers. Code review would be appreciated. Currently passes all PySpark tests, and additional tests will be added.

@ghost ghost assigned JoshRosen Apr 11, 2013
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to

if (hasNext) {
 _nextObj = read()
}

to match the project code style (in general it's good to have if blocks delimited with braces in case we expand them)

@JoshRosen
Copy link
Member

Disclaimer: I haven't hacked on preforking servers before, so my question might be naive.

I think that our goal here is to avoid fork()ing Python workers from a JVM that has a large heap (SPARK-671). I like this strategy of lazily launching a persistent Python process per worker and delegating forks() to it.

The top-level daemon (one per worker) forks POOLSIZE workers, which share a listening socket. When PythonRDD wants to spawn a worker Python process, it opens a connection to this socket. This connection is handled by one of the workers (whichever one grab's the OS's internal mutex that guards accepts() on this socket). Rather than directly servicing this request (e.g. running worker_main) and re-entering the idle worker pool, this worker process forks a child to perform the work and re-enters the pool immediately.

What does this second level of indirection buy us compared to having the daemon manage the listening socket and fork all of the workers itself? It seems like we don't benefit from having multiple processes competing for the right to spawn workers, since everything is serialized at the accept() mutex anyways?

I could see benefit from a pool model where the daemon preforks worker processes that run worker_main directly. Workers would notify the daemon when they exited the pool, allowing the daemon to know when it should begin accepting() on the listening socket and forking new workers.

@jey
Copy link
Contributor Author

jey commented Apr 12, 2013

In this case, preforking is just a rather small optimization over accept-and-fork. When N requests arrive nearly all at once in the accept-and-fork scenario, all the requests would have to be serially handled one-by-one in the manager, which has to accept, fork, accept, fork, etc. N times. By preforking the workers and having them all sit in an accept() call, we avoid the serial bottleneck in the manager with all of the context switches, system calls, and fork()s that involves. You're absolutely right that we're losing some of this performance gain since the children call fork() themselves at just about the worst possible time: when a client has arrived! This is just going to thrash the kernel's scheduler and memory bus further as extra processes get created and pages get copied for a process that'll do nothing but enter accept().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's another place to put the if body in its own block

@JoshRosen
Copy link
Member

What's the status of this pull request? Does the code need to be cleaned up or does the preforking need to be re-worked so that children don't immediately call fork() and thrash the scheduler?

Were you able to reproduce the performance issues that I noticed when running the AMP Camp exercises? Does this patch offer a significant performance improvement for those exercises?

@AmplabJenkins
Copy link

Thank you for your pull request. All automated tests for this request have passed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35/

@jey
Copy link
Contributor Author

jey commented May 7, 2013

Hey Josh, my plan was to just submit it with a few cleanups, since I think the fork() right after accept() shouldn't be too big of an issue right now. I've pushed my current code to the python-optimization branch with most (or all?) of the suggested changes applied, but I haven't yet reviewed it closely to make sure there aren't any other changes I should make before considering it a final submission.

@AmplabJenkins
Copy link

Thank you for your pull request. All automated tests for this request have passed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42/

@JoshRosen
Copy link
Member

Does Jenkins run the PySpark tests?

@jey
Copy link
Contributor Author

jey commented May 9, 2013

Don't think so; we should probably make it so that both SBT and Maven will
run the PySpark tests during Spark unit test execution

On Thu, May 9, 2013 at 1:14 PM, Josh Rosen [email protected] wrote:

Does Jenkins run the PySpark tests?


Reply to this email directly or view it on GitHubhttps://github.com//pull/563#issuecomment-17687010
.

@jey
Copy link
Contributor Author

jey commented May 23, 2013

I've updated my python-optimization branch and it should be ready to be merged. The one caveat is that I haven't tested this under Windows, and I doubt it'll work well under Windows as-is -- we might want to do some kind of fallback to the old scheme if running on a non-POSIX system?

@mateiz
Copy link
Member

mateiz commented May 23, 2013

Jey, did you run the PySpark tests for this?

@jey
Copy link
Contributor Author

jey commented May 23, 2013

Yes, and they've been updated to include additional tests for the new daemon

@AmplabJenkins
Copy link

Thank you for submitting this pull request. Unfortunately, the automated tests for this request have failed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62/

@jey
Copy link
Contributor Author

jey commented May 26, 2013

Jenkins, test this please

@AmplabJenkins
Copy link

Thank you for submitting this pull request. Unfortunately, the automated tests for this request have failed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small code style things: for calls to methods that have side-effects, use parentheses; so for example startDaemon().

@mateiz
Copy link
Member

mateiz commented May 28, 2013

Hey Jey,

One issue I notice in testing this is that Python exceptions are no longer reported to the master properly. There is still a line that writes them to stdout in worker.py, but it seems that the "worker process crashed" caused by the raise after that is noticed first, and that's what propagates back to the master. Try it out -- this is the code I typed into pyspark:

data = sc.parallelize(range(1, 10))
data.map(lambda x: x/2).collect()
data.map(lambda x: x/0).collect()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read()

@jey
Copy link
Contributor Author

jey commented May 28, 2013

Good catch, I'll look into that

On Mon, May 27, 2013 at 10:07 PM, Matei Zaharia [email protected]:

Hey Jey,

One issue I notice in testing this is that Python exceptions are no longer
reported to the master properly. There is still a line that writes them to
stdout in worker.py, but it seems that the "worker process crashed" caused
by the raise after that is noticed first, and that's what propagates back
to the master. Try it out -- this is the code I typed into pyspark:

data = sc.parallelize(range(1, 10))
data.map(lambda x: x/2).collect()
data.map(lambda x: x/0).collect()


Reply to this email directly or view it on GitHubhttps://github.com//pull/563#issuecomment-18529887
.

@AmplabJenkins
Copy link

Thank you for submitting this pull request. Unfortunately, the automated tests for this request have failed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139/

@jey
Copy link
Contributor Author

jey commented Jun 21, 2013

Hi Matei, I've fixed the issues you pointed out. Exceptions are once again being correctly reported to the master, and I added parens for function calls with side-effects.

@AmplabJenkins
Copy link

Thank you for your pull request. All automated tests for this request have passed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/150/

@mateiz
Copy link
Member

mateiz commented Jun 22, 2013

Thanks Jey, this looks great! I've merged it in.

mateiz added a commit that referenced this pull request Jun 22, 2013
Optimize PySpark worker invocation
@mateiz mateiz merged commit 7e4b266 into mesos:master Jun 22, 2013
@mateiz
Copy link
Member

mateiz commented Jun 22, 2013

FYI, I've also merged this into branch 0.7.

pwendell added a commit to andyk/mesos-spark that referenced this pull request May 5, 2014
This modifies spark-submit to do something more like the Hadoop `jar`
command. Now we have the following syntax:

./bin/spark-submit [options] user.jar [user options]

Author: Patrick Wendell <[email protected]>

Closes mesos#563 from pwendell/spark-submit and squashes the following commits:

32241fc [Patrick Wendell] Review feedback
3adfb69 [Patrick Wendell] Small fix
bc48139 [Patrick Wendell] SPARK-1606: Infer user application arguments instead of requiring --arg.
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Aug 1, 2014
Curently, daemon.py forks a pool of numProcessors subprocesses, and those
processes fork themselves again to create the actual Python worker processes
that handle data.

I think that this extra layer of indirection is unnecessary and adds a lot of
complexity.  This commit attemps to remove this middle layer of subprocesses
by launching the workers directly from daemon.py.

See mesos/spark#563 for the original PR that added
daemon.py, where I raise some issues with the current design.
asfgit pushed a commit to apache/spark that referenced this pull request Aug 2, 2014
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data.

I think that this extra layer of indirection is unnecessary and adds a lot of complexity.  This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py.

See mesos/spark#563 for the original PR that added daemon.py, where I raise some issues with the current design.

Author: Josh Rosen <[email protected]>

Closes #1680 from JoshRosen/pyspark-daemon and squashes the following commits:

5abbcb9 [Josh Rosen] Replace magic number: 4 -> EINTR
5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails.
b79254d [Josh Rosen] Detect failed fork() calls; improve error logging.
282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems:
8554536 [Josh Rosen] Fix daemon’s shutdown(); log shutdown reason.
4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death.
e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data.

I think that this extra layer of indirection is unnecessary and adds a lot of complexity.  This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py.

See mesos/spark#563 for the original PR that added daemon.py, where I raise some issues with the current design.

Author: Josh Rosen <[email protected]>

Closes apache#1680 from JoshRosen/pyspark-daemon and squashes the following commits:

5abbcb9 [Josh Rosen] Replace magic number: 4 -> EINTR
5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails.
b79254d [Josh Rosen] Detect failed fork() calls; improve error logging.
282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems:
8554536 [Josh Rosen] Fix daemon’s shutdown(); log shutdown reason.
4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death.
e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure.
@lucktroy
Copy link

As shown in https://issues.apache.org/jira/browse/SPARK-2764, at default, The per-java-worker daemon.py process launches a numCores-sized pool of subprocesses.

For example, it will lauch 16 subprocesses if there are 16 cores at a worker. However, in our application, more subprocesses are expected. How can I set the number? Like 32 in each worker.

Thanks.

@JoshRosen
Copy link
Member

@lucktroy In apache/spark#1680, linked from that pull request that you mentioned, we removed the intermediate, numCores-sized layer of subprocesses, since the only thing that the intermediate pool of processes did was to process requests to fork() the processes that actually performed work. Now, the fork() is performed directly from daemon.py. This removes some parallelism from the actual forking stage, but the impact of should have been mitigated by other performance improvements, such as apache/spark#2259 to re-use the Python worker processes across tasks.

The number of actual computation-performing processes is controlled at a higher level: there's one PySpark worker.py process per executor thread. Do you want to run more Python threads than cores? If so, you should configure your executors / workers to use more threads.

@lucktroy
Copy link

@JoshRosen Thanks for the detailed info. In our application, we'd like to more python threads than cores. However, in the current application, more executors / workers may not be helpful. The dataset is not very large. To accelerate it, we propose a hybrid method which combined model-parallelism and data-parallelism. To implement it, we also use python multi-threads. So it can run many jobs at the same time.

For example, there are 8 nodes (16 cores/node), we set spark.default.parallelism=8 because it is enough. When we used data-parallelism (1 job each time) only, each node would launches 1 pyspark daemon.

We also tested with the hybrid method, if 8 jobs are launched at the same time, each node launches 8 pyspark daemons.

As same, if 64 jobs are launched at the same time, it is expected that there are 64 pyspark daemons at each node. However, it is only up to 16 pyspark daemons. When viewed with top, we found that there are many idle CPU resources.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants