Skip to content

Commit

Permalink
Registers a SIGCHLD handler to join worker processes upon model scale…
Browse files Browse the repository at this point in the history
… down
  • Loading branch information
maaquib committed Jun 12, 2020
1 parent a463ec0 commit 6ed099a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ public void shutdown() {
thread.interrupt();
aggregator.sendError(
null, "Worker scaled down.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
if (this.serverThread) {
try {
thread.join(60L);
} catch (InterruptedException e) {
logger.error("Server thread join interrupted");
}
}
}
}

Expand Down
17 changes: 16 additions & 1 deletion mms/model_service_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

# pylint: disable=redefined-builtin

import errno
import logging
import os
import multiprocessing
Expand Down Expand Up @@ -161,6 +162,10 @@ def sigterm_handler(self):
except OSError:
pass

def sigchld_handler(self):
# Calling `active_children` has the side effect of `joining` any processes which have already finished
logging.info('Active worker count: %s', len(multiprocessing.active_children()))

def start_worker(self, cl_socket):
"""
Method to start the worker threads. These worker threads use multiprocessing to spawn a new worker.
Expand Down Expand Up @@ -199,12 +204,22 @@ def run_server(self):
logging.info("[PID] %d", os.getpid())
logging.info("MMS worker started.")
logging.info("Python runtime: %s", platform.python_version())
signal.signal(signal.SIGCHLD, lambda signum, frame: self.sigchld_handler())
while True:
if self.service is None and self.preload is True:
# Lazy loading the models
self.load_model(self.model_meta_data)

(cl_socket, _) = self.sock.accept()
# Fix for sock.accept() not ignoring EINTR on SIGCHLD in python2
# https://www.python.org/dev/peps/pep-0475/ https://bugs.python.org/issue17097
while True:
try:
(cl_socket, _) = self.sock.accept()
except socket.error as e:
if e.args[0] != errno.EINTR:
raise
else:
break
# workaround error(35, 'Resource temporarily unavailable') on OSX
cl_socket.setblocking(True)

Expand Down

0 comments on commit 6ed099a

Please sign in to comment.