-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-1740] [PySpark] kill the python worker #1643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
QA tests have started for PR 1643. This patch merges cleanly. |
|
QA results for PR 1643: |
|
QA tests have started for PR 1643. This patch merges cleanly. |
|
QA results for PR 1643: |
python/pyspark/daemon.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does sock.recv return 0 itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any concerns (performance or otherwise) related to two different processes polling on the same file descriptor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an Exception raised, n is not defined.
This threads will sleep 0.1 after every poll, the overhead will be low, it should not effect another process reading the socket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if the kernel does anything special to tie sockets to processes (similar to Java lock biasing).
For the n thing, I was asking why we bother reading n, why not just set something like socketClosed = False which we set to True in the except condition, rather than reusing this variable n.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases, there is no exception, recv() will return 0.
|
QA tests have started for PR 1643. This patch merges cleanly. |
|
QA results for PR 1643: |
|
QA tests have started for PR 1643. This patch merges cleanly. |
|
QA results for PR 1643: |
|
Would it be possible to store the PIDs of the workers inside of PythonWorkerFactory and directly kill the workers via SIGTERM? Or send a command to the Python daemon and have it kill the workers? That seems like it would be less complex than this socket-polling approach. |
|
I had tried these two approach, we have not easy way to get PIDs of workers, because it's platform dependent. Sending command to Python daemon will also need the identity of worker, it will need another channel to send commands. If there are exceptions in Java reading/writing threads, which may lead to close the socket, so we still need this kind of method to kill worker if the socket is closed. |
|
I don't think we have platform-dependent problems using PIDs, since Windows doesn't use the daemon to launch its PySpark workers; we only have to support Unix platforms in this code. |
|
I think that some of the existing One note: I'm a bit wary of the mixture of fork() and Thread in the same daemon's forked process; is this safe? |
|
Good question, it's dangerous to mix threads and fork(), it may be cause dead lock in child process. But in this case, because of GIL, then fork() happens, monitor thread is blocked or sleeping or polling, they are thread safe, so it will be a problem. |
|
I will wait for your patch, and think about using PIDs. |
|
@JoshRosen I had redo this PR based your cleanup, plz review again. |
|
QA tests have started for PR 1643. This patch merges cleanly. |
|
QA results for PR 1643: |
python/pyspark/daemon.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that os.fork() already handles negative return values by throwing OSError, so I think this else block is dead code: https://docs.python.org/2/library/os.html#os.fork
|
QA tests have started for PR 1643. This patch merges cleanly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other accesses of daemonWorkers are guarded by synchronized blocks; does this access also need synchronization? It looks like calls to stopWorker() only occur from destroyPythonWorker(), which is synchronized using the SparkEnv object, but that's a different lock. To be on the safe side, we should probably add synchronized here unless there's a good reason not to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think the current synchronization is fine: every call of PythonWorkerFactory's public methods is guarded by SparkEnv's lock.
|
This looks good overall and I'd say it's ready to merge once we address my last two comments. |
|
QA results for PR 1643: |
fix several bugs
|
QA tests have started for PR 1643. This patch merges cleanly. |
|
I had fixed several bugs and improve the kill approach, it has unit tests now. |
|
QA results for PR 1643: |
|
Thanks for updating this! In my earlier review, I had overlooked that the kill involved forking the JVM; your new approach of having the daemon kill the workers is much better. The test case looks good, too (clever use of Python's In #1680, there was some discussion over whether to use SIGKILL vs. SIGHUP to kill the Python workers. Now that I've had more time to think about it, I think SIGKILL is a fine approach:
|
|
I've merged this into |
Kill only the python worker related to cancelled tasks. The daemon will start a background thread to monitor all the opened sockets for all workers. If the socket is closed by JVM, this thread will kill the worker. When an task is cancelled, the socket to worker will be closed, then the worker will be killed by deamon. Author: Davies Liu <[email protected]> Closes #1643 from davies/kill and squashes the following commits: 8ffe9f3 [Davies Liu] kill worker by deamon, because runtime.exec() is too heavy 46ca150 [Davies Liu] address comment acd751c [Davies Liu] kill the worker when task is canceled (cherry picked from commit 55349f9) Signed-off-by: Josh Rosen <[email protected]>
|
@JoshRosen Thanks to review this, your comments help me a lot. |
Kill only the python worker related to cancelled tasks. The daemon will start a background thread to monitor all the opened sockets for all workers. If the socket is closed by JVM, this thread will kill the worker. When an task is cancelled, the socket to worker will be closed, then the worker will be killed by deamon. Author: Davies Liu <[email protected]> Closes apache#1643 from davies/kill and squashes the following commits: 8ffe9f3 [Davies Liu] kill worker by deamon, because runtime.exec() is too heavy 46ca150 [Davies Liu] address comment acd751c [Davies Liu] kill the worker when task is canceled
…metric (apache#1643) ### What changes were proposed in this pull request? This patch updates how `SQLMetric` merges two invalid instances where the value is both -1. ### Why are the changes needed? We use -1 as initial value of `SQLMetric`, and change it to 0 while merging with other `SQLMetric` instances. A `SQLMetric` will be treated as invalid and filtered out later. While we are developing with Spark, it is trouble behavior that two invalid `SQLMetric` instances merge to a valid `SQLMetric` because merging will set the value to 0. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#38969 from viirya/minor_sql_metrics. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Kill only the python worker related to cancelled tasks.
The daemon will start a background thread to monitor all the opened sockets for all workers. If the socket is closed by JVM, this thread will kill the worker.
When an task is cancelled, the socket to worker will be closed, then the worker will be killed by deamon.