From 13137a7093946b2458095be6a29515bf9a0c73a9 Mon Sep 17 00:00:00 2001 From: Maarten Breddels Date: Tue, 28 May 2019 23:21:23 +0200 Subject: [PATCH] WIP feat(fork): Allow forking of a kernel --- ipykernel/kernelapp.py | 81 +++++++++++++++++++++++++++++++++++++++-- ipykernel/kernelbase.py | 22 ++++++++++- 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index d9eef61ab..4830d8d0b 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -501,10 +501,83 @@ def start(self): self.poller.start() self.kernel.start() self.io_loop = ioloop.IOLoop.current() - try: - self.io_loop.start() - except KeyboardInterrupt: - pass + keep_running = True + while keep_running: + try: + self.io_loop.start() + except KeyboardInterrupt: + pass + if not getattr(self.io_loop, '_fork_requested', False): + keep_running = False + else: + pid = os.fork() + self.io_loop._fork_requested = False # reset for parent AND child + if pid == 0: + import asyncio + self.log.debug('Child kernel with pid ', os.getpid()) + + # try to aggresively close all sockets/ioloops etc + for socket in [self.control_socket, self.iopub_socket, self.shell_socket, self.stdin_socket]: + socket.close() + self.io_loop.close(all_fds=True) + self.io_loop.clear_current() + ioloop.IOLoop.clear_current() + + # check if the event loop is really being replaced + loop = asyncio.get_event_loop() + self.log.debug('asyncioloop: %r %r', loop, id(loop)) + + # install a new policy, sources: + # https://bugs.python.org/issue21998 + # it will give each pid a new io loop + _default_policy = asyncio.get_event_loop_policy() + _pid_loop = {} + class MultiprocessingPolicy(asyncio.AbstractEventLoopPolicy): + def get_event_loop(self): + pid = os.getpid() + loop = _pid_loop.get(pid) + if loop is None: + loop = self.new_event_loop() + _pid_loop[pid] = loop + return loop + + def set_event_loop(self, loop): + pid = os.getpid() + _pid_loop[pid] = loop + + def new_event_loop(self): + return _default_policy.new_event_loop() + + asyncio.set_event_loop_policy(MultiprocessingPolicy()) + # del _pid_loop[os.getpid()] + asyncio.new_event_loop() + loop = asyncio.get_event_loop() + self.log.debug('asyncioloop: %r %r', loop, id(loop)) + + import tornado.platform.asyncio as tasio + # explicitly create a new io look that will also be the current + self.io_loop = tasio.AsyncIOMainLoop(make_current=True) + assert self.io_loop == ioloop.IOLoop.current() + + # reset ports, so they will actually get updated + self.hb_port = 0 + self.shell_port = 0 + self.iopub_port = 0 + self.stdin_port = 0 + self.control_port = 0 + # TODO: we might want to pass the same arguments, except the file + # NOTE: we actually start a new kernel, but once this works + # we can actually think about reusing the kernel object + self.initialize(argv=['-f', 'conn_fork.json', '--debug']) + self.start() + pass + else: + self.log.debug('Parent kernel will resume') + # keep a reference, since the will set this to None + post_fork_callback = self.io_loop._post_fork_callback + self.io_loop.add_callback(lambda: post_fork_callback(pid)) + self.io_loop._post_fork_callback = None + launch_new_instance = IPKernelApp.launch_instance diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 0fbfebe53..16926675d 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -154,11 +154,31 @@ def _default_ident(self): 'connect_request', 'shutdown_request', 'is_complete_request', # deprecated: - 'apply_request', + 'apply_request', 'fork' ] # add deprecated ipyparallel control messages control_msg_types = msg_types + ['clear_request', 'abort_request'] + def fork(self, stream, ident, parent): + # Forking in the (async)io loop is not supported. + # instead, we stop it, and use the io loop to pass + # information up the callstack + loop = ioloop.IOLoop.current() + loop._fork_requested = True + def post_fork_callback(pid): + # we might be able to pass back the port information/connection + # info file here. This is just a proof of concept + reply_content = json_clean({'status': 'ok', 'fork_id': pid}) + metadata = {} + metadata = self.finish_metadata(parent, metadata, reply_content) + + reply_msg = self.session.send(stream, u'execute_reply', + reply_content, parent, metadata=metadata, + ident=ident) + + loop._post_fork_callback = post_fork_callback + loop.stop() + def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) # Build dict of handlers for message types