Skip to content

Commit 8554536

Browse files
committed
Fix daemon’s shutdown(); log shutdown reason.
1 parent 4e0fab8 commit 8554536

File tree

1 file changed

+14
-10
lines changed

1 file changed

+14
-10
lines changed

python/pyspark/daemon.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,17 @@ def manager():
8787
listen_host, listen_port = listen_sock.getsockname()
8888
write_int(listen_port, sys.stdout)
8989

90-
def shutdown():
91-
global exit_flag
92-
exit_flag.value = True
90+
def shutdown(code):
91+
signal.signal(SIGTERM, SIG_DFL)
92+
# Send SIGHUP to notify workers of shutdown
93+
os.kill(0, SIGHUP)
94+
exit(code)
9395

94-
# Gracefully exit on SIGTERM, don't die on SIGHUP
95-
signal.signal(SIGTERM, lambda signum, frame: shutdown())
96-
signal.signal(SIGHUP, SIG_IGN)
96+
def sig_term(signum, frame):
97+
print >> sys.stderr, "daemon.py shutting down due to SIGTERM"
98+
shutdown(1)
99+
signal.signal(SIGTERM, sig_term) # Gracefully exit on SIGTERM
100+
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
97101

98102
# Cleanup zombie children
99103
def handle_sigchld(*args):
@@ -120,7 +124,8 @@ def handle_sigchld(*args):
120124
raise
121125
if 0 in ready_fds:
122126
# Spark told us to exit by closing stdin
123-
shutdown()
127+
print >> sys.stderr, "daemon.py shutting down because Java closed stdin"
128+
shutdown(0)
124129
if listen_sock in ready_fds:
125130
sock, addr = listen_sock.accept()
126131
# Launch a worker process
@@ -136,9 +141,8 @@ def handle_sigchld(*args):
136141
else:
137142
sock.close()
138143
finally:
139-
signal.signal(SIGTERM, SIG_DFL)
140-
# Send SIGHUP to notify workers of shutdown
141-
os.kill(0, SIGHUP)
144+
print >> sys.stderr, "daemon.py shutting down due to uncaught exception"
145+
shutdown(1)
142146

143147

144148
if __name__ == '__main__':

0 commit comments

Comments
 (0)