Skip to content

Commit

Permalink
Kernel subshells (JEP91) implementation (#1249)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
ianthomas23 and pre-commit-ci[bot] authored Oct 3, 2024
1 parent 0fa5439 commit 3089438
Show file tree
Hide file tree
Showing 16 changed files with 988 additions and 68 deletions.
24 changes: 24 additions & 0 deletions docs/api/ipykernel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,30 @@ Submodules
:show-inheritance:


.. automodule:: ipykernel.shellchannel
:members:
:undoc-members:
:show-inheritance:


.. automodule:: ipykernel.subshell
:members:
:undoc-members:
:show-inheritance:


.. automodule:: ipykernel.subshell_manager
:members:
:undoc-members:
:show-inheritance:


.. automodule:: ipykernel.thread
:members:
:undoc-members:
:show-inheritance:


.. automodule:: ipykernel.trio_runner
:members:
:undoc-members:
Expand Down
35 changes: 3 additions & 32 deletions ipykernel/control.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,11 @@
"""A thread for a control channel."""
from threading import Event, Thread

from anyio import create_task_group, run, to_thread
from .thread import CONTROL_THREAD_NAME, BaseThread

CONTROL_THREAD_NAME = "Control"


class ControlThread(Thread):
class ControlThread(BaseThread):
"""A thread for a control channel."""

def __init__(self, **kwargs):
"""Initialize the thread."""
Thread.__init__(self, name=CONTROL_THREAD_NAME, **kwargs)
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
self.__stop = Event()
self._task = None

def set_task(self, task):
self._task = task

def run(self):
"""Run the thread."""
self.name = CONTROL_THREAD_NAME
run(self._main)

async def _main(self):
async with create_task_group() as tg:
if self._task is not None:
tg.start_soon(self._task)
await to_thread.run_sync(self.__stop.wait)
tg.cancel_scope.cancel()

def stop(self):
"""Stop the thread.
This method is threadsafe.
"""
self.__stop.set()
super().__init__(name=CONTROL_THREAD_NAME, **kwargs)
2 changes: 1 addition & 1 deletion ipykernel/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, context, addr=None):
"""Initialize the heartbeat thread."""
if addr is None:
addr = ("tcp", localhost(), 0)
Thread.__init__(self, name="Heartbeat")
super().__init__(name="Heartbeat")
self.context = context
self.transport, self.ip, self.port = addr
self.original_port = self.port
Expand Down
10 changes: 5 additions & 5 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class _IOPubThread(Thread):

def __init__(self, tasks, **kwargs):
"""Initialize the thread."""
Thread.__init__(self, name="IOPub", **kwargs)
super().__init__(name="IOPub", **kwargs)
self._tasks = tasks
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
Expand Down Expand Up @@ -170,10 +170,10 @@ async def _handle_event(self):
for _ in range(n_events):
event_f = self._events.popleft()
event_f()
except Exception as e:
except Exception:
if self.thread.__stop.is_set():
return
raise e
raise

def _setup_pipe_in(self):
"""setup listening pipe for IOPub from forked subprocesses"""
Expand Down Expand Up @@ -202,10 +202,10 @@ async def _handle_pipe_msgs(self):
try:
while True:
await self._handle_pipe_msg()
except Exception as e:
except Exception:
if self.thread.__stop.is_set():
return
raise e
raise

async def _handle_pipe_msg(self, msg=None):
"""handle a pipe message from a subprocess"""
Expand Down
8 changes: 8 additions & 0 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from .iostream import IOPubThread
from .ipkernel import IPythonKernel
from .parentpoller import ParentPollerUnix, ParentPollerWindows
from .shellchannel import ShellChannelThread
from .zmqshell import ZMQInteractiveShell

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -143,6 +144,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix
iopub_socket = Any()
iopub_thread = Any()
control_thread = Any()
shell_channel_thread = Any()

_ports = Dict()

Expand Down Expand Up @@ -367,6 +369,7 @@ def init_control(self, context):
self.control_socket.router_handover = 1

self.control_thread = ControlThread(daemon=True)
self.shell_channel_thread = ShellChannelThread(context, self.shell_socket, daemon=True)

def init_iopub(self, context):
"""Initialize the iopub channel."""
Expand Down Expand Up @@ -406,6 +409,10 @@ def close(self):
self.log.debug("Closing control thread")
self.control_thread.stop()
self.control_thread.join()
if self.shell_channel_thread and self.shell_channel_thread.is_alive():
self.log.debug("Closing shell channel thread")
self.shell_channel_thread.stop()
self.shell_channel_thread.join()

if self.debugpy_socket and not self.debugpy_socket.closed:
self.debugpy_socket.close()
Expand Down Expand Up @@ -562,6 +569,7 @@ def init_kernel(self):
debug_shell_socket=self.debug_shell_socket,
shell_socket=self.shell_socket,
control_thread=self.control_thread,
shell_channel_thread=self.shell_channel_thread,
iopub_thread=self.iopub_thread,
iopub_socket=self.iopub_socket,
stdin_socket=self.stdin_socket,
Expand Down
Loading

0 comments on commit 3089438

Please sign in to comment.