Skip to content

Commit 4e0fab8

Browse files
committed
Remove shared-memory exit_flag; don't die on worker death.
1 parent e9892b4 commit 4e0fab8

File tree

1 file changed

+5
-15
lines changed

1 file changed

+5
-15
lines changed

python/pyspark/daemon.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,22 @@
1515
# limitations under the License.
1616
#
1717

18+
import numbers
1819
import os
1920
import signal
2021
import select
2122
import socket
2223
import sys
2324
import traceback
24-
import multiprocessing
25-
from ctypes import c_bool
2625
from errno import EINTR, ECHILD
2726
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
2827
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
2928
from pyspark.worker import main as worker_main
3029
from pyspark.serializers import write_int
3130

32-
exit_flag = multiprocessing.Value(c_bool, False)
33-
34-
35-
def should_exit():
36-
global exit_flag
37-
return exit_flag.value
38-
3931

4032
def compute_real_exit_code(exit_code):
4133
# SystemExit's code can be integer or string, but os._exit only accepts integers
42-
import numbers
4334
if isinstance(exit_code, numbers.Integral):
4435
return exit_code
4536
else:
@@ -108,8 +99,9 @@ def shutdown():
10899
def handle_sigchld(*args):
109100
try:
110101
pid, status = os.waitpid(0, os.WNOHANG)
111-
if status != 0 and not should_exit():
112-
raise RuntimeError("worker crashed: %s, %s" % (pid, status))
102+
if status != 0:
103+
msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
104+
print >> sys.stderr, msg
113105
except EnvironmentError as err:
114106
if err.errno not in (ECHILD, EINTR):
115107
raise
@@ -118,7 +110,7 @@ def handle_sigchld(*args):
118110
# Initialization complete
119111
sys.stdout.close()
120112
try:
121-
while not should_exit():
113+
while True:
122114
try:
123115
ready_fds = select.select([0, listen_sock], [], [])[0]
124116
except select.error as ex:
@@ -140,13 +132,11 @@ def handle_sigchld(*args):
140132
traceback.print_exc()
141133
os._exit(1)
142134
else:
143-
assert should_exit()
144135
os._exit(0)
145136
else:
146137
sock.close()
147138
finally:
148139
signal.signal(SIGTERM, SIG_DFL)
149-
exit_flag.value = True
150140
# Send SIGHUP to notify workers of shutdown
151141
os.kill(0, SIGHUP)
152142

0 commit comments

Comments
 (0)