Skip to content

Commit e9892b4

Browse files
committed
[WIP] [SPARK-2764] Simplify daemon.py process structure.
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data. I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attemps to remove this middle layer of subprocesses by launching the workers directly from daemon.py. See mesos/spark#563 for the original PR that added daemon.py, where I raise some issues with the current design.
1 parent ef4ff00 commit e9892b4

File tree

1 file changed

+46
-85
lines changed

1 file changed

+46
-85
lines changed

python/pyspark/daemon.py

Lines changed: 46 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import os
1919
import signal
20+
import select
2021
import socket
2122
import sys
2223
import traceback
@@ -28,11 +29,6 @@
2829
from pyspark.worker import main as worker_main
2930
from pyspark.serializers import write_int
3031

31-
try:
32-
POOLSIZE = multiprocessing.cpu_count()
33-
except NotImplementedError:
34-
POOLSIZE = 4
35-
3632
exit_flag = multiprocessing.Value(c_bool, False)
3733

3834

@@ -50,29 +46,16 @@ def compute_real_exit_code(exit_code):
5046
return 1
5147

5248

53-
def worker(listen_sock):
49+
def worker(sock):
50+
"""
51+
Called by a worker process after the fork().
52+
"""
5453
# Redirect stdout to stderr
5554
os.dup2(2, 1)
5655
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
5756

58-
# Manager sends SIGHUP to request termination of workers in the pool
59-
def handle_sighup(*args):
60-
assert should_exit()
61-
signal.signal(SIGHUP, handle_sighup)
62-
63-
# Cleanup zombie children
64-
def handle_sigchld(*args):
65-
pid = status = None
66-
try:
67-
while (pid, status) != (0, 0):
68-
pid, status = os.waitpid(0, os.WNOHANG)
69-
except EnvironmentError as err:
70-
if err.errno == EINTR:
71-
# retry
72-
handle_sigchld()
73-
elif err.errno != ECHILD:
74-
raise
75-
signal.signal(SIGCHLD, handle_sigchld)
57+
signal.signal(SIGHUP, SIG_DFL)
58+
signal.signal(SIGCHLD, SIG_DFL)
7659

7760
# Blocks until the socket is closed by draining the input stream
7861
# until it raises an exception or returns EOF.
@@ -85,55 +68,21 @@ def waitSocketClose(sock):
8568
except:
8669
pass
8770

88-
# Handle clients
89-
while not should_exit():
90-
# Wait until a client arrives or we have to exit
91-
sock = None
92-
while not should_exit() and sock is None:
93-
try:
94-
sock, addr = listen_sock.accept()
95-
except EnvironmentError as err:
96-
if err.errno != EINTR:
97-
raise
98-
99-
if sock is not None:
100-
# Fork a child to handle the client.
101-
# The client is handled in the child so that the manager
102-
# never receives SIGCHLD unless a worker crashes.
103-
if os.fork() == 0:
104-
# Leave the worker pool
105-
signal.signal(SIGHUP, SIG_DFL)
106-
signal.signal(SIGCHLD, SIG_DFL)
107-
listen_sock.close()
108-
# Read the socket using fdopen instead of socket.makefile() because the latter
109-
# seems to be very slow; note that we need to dup() the file descriptor because
110-
# otherwise writes also cause a seek that makes us miss data on the read side.
111-
infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
112-
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
113-
exit_code = 0
114-
try:
115-
worker_main(infile, outfile)
116-
except SystemExit as exc:
117-
exit_code = exc.code
118-
finally:
119-
outfile.flush()
120-
# The Scala side will close the socket upon task completion.
121-
waitSocketClose(sock)
122-
os._exit(compute_real_exit_code(exit_code))
123-
else:
124-
sock.close()
125-
126-
127-
def launch_worker(listen_sock):
128-
if os.fork() == 0:
129-
try:
130-
worker(listen_sock)
131-
except Exception as err:
132-
traceback.print_exc()
133-
os._exit(1)
134-
else:
135-
assert should_exit()
136-
os._exit(0)
71+
# Read the socket using fdopen instead of socket.makefile() because the latter
72+
# seems to be very slow; note that we need to dup() the file descriptor because
73+
# otherwise writes also cause a seek that makes us miss data on the read side.
74+
infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
75+
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
76+
exit_code = 0
77+
try:
78+
worker_main(infile, outfile)
79+
except SystemExit as exc:
80+
exit_code = exc.code
81+
finally:
82+
outfile.flush()
83+
# The Scala side will close the socket upon task completion.
84+
waitSocketClose(sock)
85+
os._exit(compute_real_exit_code(exit_code))
13786

13887

13988
def manager():
@@ -143,15 +92,10 @@ def manager():
14392
# Create a listening socket on the AF_INET loopback interface
14493
listen_sock = socket.socket(AF_INET, SOCK_STREAM)
14594
listen_sock.bind(('127.0.0.1', 0))
146-
listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
95+
listen_sock.listen(max(1024, SOMAXCONN))
14796
listen_host, listen_port = listen_sock.getsockname()
14897
write_int(listen_port, sys.stdout)
14998

150-
# Launch initial worker pool
151-
for idx in range(POOLSIZE):
152-
launch_worker(listen_sock)
153-
listen_sock.close()
154-
15599
def shutdown():
156100
global exit_flag
157101
exit_flag.value = True
@@ -176,13 +120,30 @@ def handle_sigchld(*args):
176120
try:
177121
while not should_exit():
178122
try:
179-
# Spark tells us to exit by closing stdin
180-
if os.read(0, 512) == '':
181-
shutdown()
182-
except EnvironmentError as err:
183-
if err.errno != EINTR:
184-
shutdown()
123+
ready_fds = select.select([0, listen_sock], [], [])[0]
124+
except select.error as ex:
125+
if ex[0] == 4:
126+
continue
127+
else:
185128
raise
129+
if 0 in ready_fds:
130+
# Spark told us to exit by closing stdin
131+
shutdown()
132+
if listen_sock in ready_fds:
133+
sock, addr = listen_sock.accept()
134+
# Launch a worker process
135+
if os.fork() == 0:
136+
listen_sock.close()
137+
try:
138+
worker(sock)
139+
except:
140+
traceback.print_exc()
141+
os._exit(1)
142+
else:
143+
assert should_exit()
144+
os._exit(0)
145+
else:
146+
sock.close()
186147
finally:
187148
signal.signal(SIGTERM, SIG_DFL)
188149
exit_flag.value = True

0 commit comments

Comments
 (0)