Skip to content

Commit

Permalink
Merge pull request #772 from blink1073/fix-shutdown-behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 authored Apr 25, 2022
2 parents 5759512 + 523ba88 commit fa597d9
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 56 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ jobs:
- name: Run the tests
if: ${{ !startsWith(matrix.python-version, 'pypy') && !startsWith(matrix.os, 'windows') }}
run: |
args="-vv --cov jupyter_client --cov-branch --cov-report term-missing:skip-covered --cov-fail-under 70"
python -m pytest $args || python -m pytest $args --lf
args="-vv --cov jupyter_client --cov-branch --cov-report term-missing:skip-covered"
python -m pytest $args --cov-fail-under 70 || python -m pytest $args --lf
- name: Run the tests on pypy and windows
if: ${{ startsWith(matrix.python-version, 'pypy') || startsWith(matrix.os, 'windows') }}
run: |
Expand Down Expand Up @@ -150,3 +150,5 @@ jobs:
steps:
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
- uses: jupyterlab/maintainer-tools/.github/actions/test-sdist@v1
with:
test_command: pytest --vv || pytest -vv --lf
1 change: 1 addition & 0 deletions jupyter_client/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def run(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._async_run())
loop.close()

async def _async_run(self) -> None:
"""The thread's main activity. Call start() instead."""
Expand Down
25 changes: 18 additions & 7 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KernelClient(ConnectionFileMixin):
# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.asyncio.Context)

_created_context: Bool = Bool(False)
_created_context = Bool(False)

def _context_default(self) -> zmq.asyncio.Context:
self._created_context = True
Expand All @@ -116,6 +116,23 @@ def _context_default(self) -> zmq.asyncio.Context:
# flag for whether execute requests should be allowed to call raw_input:
allow_stdin: bool = True

def __del__(self):
"""Handle garbage collection. Destroy context if applicable."""
if self._created_context and self.context and not self.context.closed:
if self.channels_running:
if self.log:
self.log.warning("Could not destroy zmq context for %s", self)
else:
if self.log:
self.log.debug("Destroying zmq context for %s", self)
self.context.destroy()
try:
super_del = super().__del__
except AttributeError:
pass
else:
super_del()

# --------------------------------------------------------------------------
# Channel proxy methods
# --------------------------------------------------------------------------
Expand Down Expand Up @@ -286,9 +303,6 @@ def start_channels(
:meth:`start_kernel`. If the channels have been stopped and you
call this, :class:`RuntimeError` will be raised.
"""
# Create the context if needed.
if not self._created_context:
self.context = self._context_default()
if iopub:
self.iopub_channel.start()
if shell:
Expand Down Expand Up @@ -318,9 +332,6 @@ def stop_channels(self) -> None:
self.hb_channel.stop()
if self.control_channel.is_alive():
self.control_channel.stop()
if self._created_context:
self._created_context = False
self.context.destroy()

@property
def channels_running(self) -> bool:
Expand Down
70 changes: 31 additions & 39 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def create_kernel_manager(*args: Any, **kwargs: Any) -> KernelManager:
help="Share a single zmq.Context to talk to all my kernels",
).tag(config=True)

_created_context = Bool(False)

context = Instance("zmq.Context")

_created_context = Bool(False)

_pending_kernels = Dict()

@property
Expand All @@ -111,7 +111,12 @@ def _context_default(self) -> zmq.Context:
self._created_context = True
return zmq.Context()

connection_dir = Unicode("")

_kernels = Dict()

def __del__(self):
"""Handle garbage collection. Destroy context if applicable."""
if self._created_context and self.context and not self.context.closed:
if self.log:
self.log.debug("Destroying zmq context for %s", self)
Expand All @@ -123,10 +128,6 @@ def __del__(self):
else:
super_del()

connection_dir = Unicode("")

_kernels = Dict()

def list_kernel_ids(self) -> t.List[str]:
"""Return a list of the kernel ids of the active kernels."""
# Create a copy so we can iterate over kernels in operations
Expand Down Expand Up @@ -171,17 +172,19 @@ async def _add_kernel_when_ready(
try:
await kernel_awaitable
self._kernels[kernel_id] = km
finally:
self._pending_kernels.pop(kernel_id, None)
except Exception as e:
self.log.exception(e)

async def _remove_kernel_when_ready(
self, kernel_id: str, kernel_awaitable: t.Awaitable
) -> None:
try:
await kernel_awaitable
self.remove_kernel(kernel_id)
finally:
self._pending_kernels.pop(kernel_id, None)
except Exception as e:
self.log.exception(e)

def _using_pending_kernels(self):
"""Returns a boolean; a clearer method for determining if
Expand All @@ -207,15 +210,15 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner

starter = ensure_async(km.start_kernel(**kwargs))
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._pending_kernels[kernel_id] = fut
task = asyncio.create_task(self._add_kernel_when_ready(kernel_id, km, starter))
self._pending_kernels[kernel_id] = task
# Handling a Pending Kernel
if self._using_pending_kernels():
# If using pending kernels, do not block
# on the kernel start.
self._kernels[kernel_id] = km
else:
await fut
await task
# raise an exception if one occurred during kernel startup.
if km.ready.exception():
raise km.ready.exception() # type: ignore
Expand All @@ -224,22 +227,6 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg

start_kernel = run_sync(_async_start_kernel)

async def _shutdown_kernel_when_ready(
self,
kernel_id: str,
now: t.Optional[bool] = False,
restart: t.Optional[bool] = False,
) -> None:
"""Wait for a pending kernel to be ready
before shutting the kernel down.
"""
# Only do this if using pending kernels
if self._using_pending_kernels():
kernel = self._kernels[kernel_id]
await kernel.ready
# Once out of a pending state, we can call shutdown.
await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart))

async def _async_shutdown_kernel(
self,
kernel_id: str,
Expand All @@ -258,22 +245,21 @@ async def _async_shutdown_kernel(
Will the kernel be restarted?
"""
self.log.info("Kernel shutdown: %s" % kernel_id)
# If we're using pending kernels, block shutdown when a kernel is pending.
if self._using_pending_kernels() and kernel_id in self._pending_kernels:
raise RuntimeError("Kernel is in a pending state. Cannot shutdown.")
# If the kernel is still starting, wait for it to be ready.
elif kernel_id in self._pending_kernels:
kernel = self._pending_kernels[kernel_id]
if kernel_id in self._pending_kernels:
task = self._pending_kernels[kernel_id]
try:
await kernel
await task
km = self.get_kernel(kernel_id)
await t.cast(asyncio.Future, km.ready)
except asyncio.CancelledError:
pass
except Exception:
self.remove_kernel(kernel_id)
return
km = self.get_kernel(kernel_id)
# If a pending kernel raised an exception, remove it.
if km.ready.exception():
if not km.ready.cancelled() and km.ready.exception():
self.remove_kernel(kernel_id)
return
stopper = ensure_async(km.shutdown_kernel(now, restart))
Expand Down Expand Up @@ -320,13 +306,19 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
"""Shutdown all kernels."""
kids = self.list_kernel_ids()
kids += list(self._pending_kernels)
futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)]
kms = list(self._kernels.values())
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
await asyncio.gather(*futs)
# When using "shutdown all", all pending kernels
# should be awaited before exiting this method.
# If using pending kernels, the kernels will not have been fully shut down.
if self._using_pending_kernels():
for km in self._kernels.values():
await km.ready
for km in kms:
try:
await km.ready
except asyncio.CancelledError:
self._pending_kernels[km.kernel_id].cancel()
except Exception:
# Will have been logged in _add_kernel_when_ready
pass

shutdown_all = run_sync(_async_shutdown_all)

Expand Down
2 changes: 2 additions & 0 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,15 @@ async def test_async_signal_kernel_subprocesses(self, name, install, expected):
class TestKernelManager:
def test_lifecycle(self, km):
km.start_kernel(stdout=PIPE, stderr=PIPE)
kc = km.client()
assert km.is_alive()
is_done = km.ready.done()
assert is_done
km.restart_kernel(now=True)
assert km.is_alive()
km.interrupt_kernel()
assert isinstance(km, KernelManager)
kc.stop_channels()
km.shutdown_kernel(now=True)
assert km.context.closed

Expand Down
12 changes: 7 additions & 5 deletions jupyter_client/tests/test_multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ def _run_lifecycle(km, test_kid=None):
assert kid in km.list_kernel_ids()
km.interrupt_kernel(kid)
k = km.get_kernel(kid)
kc = k.client()
assert isinstance(k, KernelManager)
km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"
kc.stop_channels()

def _run_cinfo(self, km, transport, ip):
kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
Expand Down Expand Up @@ -158,8 +160,10 @@ def test_start_sequence_ipc_kernels(self):

def tcp_lifecycle_with_loop(self):
# Ensure each thread has an event loop
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.test_tcp_lifecycle()
loop.close()

def test_start_parallel_thread_kernels(self):
self.test_tcp_lifecycle()
Expand Down Expand Up @@ -415,10 +419,6 @@ async def test_use_pending_kernels_early_shutdown(self):
kernel = km.get_kernel(kid)
assert not kernel.ready.done()
# Try shutting down while the kernel is pending
with pytest.raises(RuntimeError):
await ensure_future(km.shutdown_kernel(kid, now=True))
await kernel.ready
# Shutdown once the kernel is ready
await ensure_future(km.shutdown_kernel(kid, now=True))
# Wait for the kernel to shutdown
await kernel.ready
Expand Down Expand Up @@ -476,6 +476,7 @@ def tcp_lifecycle_with_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.raw_tcp_lifecycle())
loop.close()

# static so picklable for multiprocessing on Windows
@classmethod
Expand All @@ -491,6 +492,7 @@ def raw_tcp_lifecycle_sync(cls, test_kid=None):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(cls.raw_tcp_lifecycle(test_kid=test_kid))
loop.close()

@gen_test
async def test_start_parallel_thread_kernels(self):
Expand Down
3 changes: 3 additions & 0 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def stop(self) -> None:
self.close()
self.ioloop = None

def __del__(self):
self.close()

def close(self) -> None:
if self.ioloop is not None:
try:
Expand Down
8 changes: 6 additions & 2 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ def wrapped(*args, **kwargs):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Workaround for bugs.python.org/issue39529.
try:
loop = asyncio.get_event_loop_policy().get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
import nest_asyncio # type: ignore

nest_asyncio.apply(loop)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ filterwarnings= [
# Fail on warnings
"error",

# Workarounds for https://github.com/pytest-dev/pytest-asyncio/issues/77
# We need to handle properly closing loops as part of https://github.com/jupyter/jupyter_client/issues/755.
"ignore:unclosed <socket.socket:ResourceWarning",
"ignore:unclosed event loop:ResourceWarning",

Expand Down

0 comments on commit fa597d9

Please sign in to comment.