diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 436d8bcaa..1f9e34a41 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -71,8 +71,8 @@ jobs: - name: Run the tests if: ${{ !startsWith(matrix.python-version, 'pypy') && !startsWith(matrix.os, 'windows') }} run: | - args="-vv --cov jupyter_client --cov-branch --cov-report term-missing:skip-covered --cov-fail-under 70" - python -m pytest $args || python -m pytest $args --lf + args="-vv --cov jupyter_client --cov-branch --cov-report term-missing:skip-covered" + python -m pytest $args --cov-fail-under 70 || python -m pytest $args --lf - name: Run the tests on pypy and windows if: ${{ startsWith(matrix.python-version, 'pypy') || startsWith(matrix.os, 'windows') }} run: | @@ -150,3 +150,5 @@ jobs: steps: - uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1 - uses: jupyterlab/maintainer-tools/.github/actions/test-sdist@v1 + with: + test_command: pytest --vv || pytest -vv --lf diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index 584f26f7a..7be778054 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -109,6 +109,7 @@ def run(self) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self._async_run()) + loop.close() async def _async_run(self) -> None: """The thread's main activity. Call start() instead.""" diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 7dd64731e..5ffa069d9 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -93,7 +93,7 @@ class KernelClient(ConnectionFileMixin): # The PyZMQ Context to use for communication with the kernel. context = Instance(zmq.asyncio.Context) - _created_context: Bool = Bool(False) + _created_context = Bool(False) def _context_default(self) -> zmq.asyncio.Context: self._created_context = True @@ -116,6 +116,23 @@ def _context_default(self) -> zmq.asyncio.Context: # flag for whether execute requests should be allowed to call raw_input: allow_stdin: bool = True + def __del__(self): + """Handle garbage collection. Destroy context if applicable.""" + if self._created_context and self.context and not self.context.closed: + if self.channels_running: + if self.log: + self.log.warning("Could not destroy zmq context for %s", self) + else: + if self.log: + self.log.debug("Destroying zmq context for %s", self) + self.context.destroy() + try: + super_del = super().__del__ + except AttributeError: + pass + else: + super_del() + # -------------------------------------------------------------------------- # Channel proxy methods # -------------------------------------------------------------------------- @@ -286,9 +303,6 @@ def start_channels( :meth:`start_kernel`. If the channels have been stopped and you call this, :class:`RuntimeError` will be raised. """ - # Create the context if needed. - if not self._created_context: - self.context = self._context_default() if iopub: self.iopub_channel.start() if shell: @@ -318,9 +332,6 @@ def stop_channels(self) -> None: self.hb_channel.stop() if self.control_channel.is_alive(): self.control_channel.stop() - if self._created_context: - self._created_context = False - self.context.destroy() @property def channels_running(self) -> bool: diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index ee5863e93..aec04d546 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -95,10 +95,10 @@ def create_kernel_manager(*args: Any, **kwargs: Any) -> KernelManager: help="Share a single zmq.Context to talk to all my kernels", ).tag(config=True) - _created_context = Bool(False) - context = Instance("zmq.Context") + _created_context = Bool(False) + _pending_kernels = Dict() @property @@ -111,7 +111,12 @@ def _context_default(self) -> zmq.Context: self._created_context = True return zmq.Context() + connection_dir = Unicode("") + + _kernels = Dict() + def __del__(self): + """Handle garbage collection. Destroy context if applicable.""" if self._created_context and self.context and not self.context.closed: if self.log: self.log.debug("Destroying zmq context for %s", self) @@ -123,10 +128,6 @@ def __del__(self): else: super_del() - connection_dir = Unicode("") - - _kernels = Dict() - def list_kernel_ids(self) -> t.List[str]: """Return a list of the kernel ids of the active kernels.""" # Create a copy so we can iterate over kernels in operations @@ -171,8 +172,9 @@ async def _add_kernel_when_ready( try: await kernel_awaitable self._kernels[kernel_id] = km - finally: self._pending_kernels.pop(kernel_id, None) + except Exception as e: + self.log.exception(e) async def _remove_kernel_when_ready( self, kernel_id: str, kernel_awaitable: t.Awaitable @@ -180,8 +182,9 @@ async def _remove_kernel_when_ready( try: await kernel_awaitable self.remove_kernel(kernel_id) - finally: self._pending_kernels.pop(kernel_id, None) + except Exception as e: + self.log.exception(e) def _using_pending_kernels(self): """Returns a boolean; a clearer method for determining if @@ -207,15 +210,15 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner starter = ensure_async(km.start_kernel(**kwargs)) - fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter)) - self._pending_kernels[kernel_id] = fut + task = asyncio.create_task(self._add_kernel_when_ready(kernel_id, km, starter)) + self._pending_kernels[kernel_id] = task # Handling a Pending Kernel if self._using_pending_kernels(): # If using pending kernels, do not block # on the kernel start. self._kernels[kernel_id] = km else: - await fut + await task # raise an exception if one occurred during kernel startup. if km.ready.exception(): raise km.ready.exception() # type: ignore @@ -224,22 +227,6 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg start_kernel = run_sync(_async_start_kernel) - async def _shutdown_kernel_when_ready( - self, - kernel_id: str, - now: t.Optional[bool] = False, - restart: t.Optional[bool] = False, - ) -> None: - """Wait for a pending kernel to be ready - before shutting the kernel down. - """ - # Only do this if using pending kernels - if self._using_pending_kernels(): - kernel = self._kernels[kernel_id] - await kernel.ready - # Once out of a pending state, we can call shutdown. - await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart)) - async def _async_shutdown_kernel( self, kernel_id: str, @@ -258,22 +245,21 @@ async def _async_shutdown_kernel( Will the kernel be restarted? """ self.log.info("Kernel shutdown: %s" % kernel_id) - # If we're using pending kernels, block shutdown when a kernel is pending. - if self._using_pending_kernels() and kernel_id in self._pending_kernels: - raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") # If the kernel is still starting, wait for it to be ready. - elif kernel_id in self._pending_kernels: - kernel = self._pending_kernels[kernel_id] + if kernel_id in self._pending_kernels: + task = self._pending_kernels[kernel_id] try: - await kernel + await task km = self.get_kernel(kernel_id) await t.cast(asyncio.Future, km.ready) + except asyncio.CancelledError: + pass except Exception: self.remove_kernel(kernel_id) return km = self.get_kernel(kernel_id) # If a pending kernel raised an exception, remove it. - if km.ready.exception(): + if not km.ready.cancelled() and km.ready.exception(): self.remove_kernel(kernel_id) return stopper = ensure_async(km.shutdown_kernel(now, restart)) @@ -320,13 +306,19 @@ async def _async_shutdown_all(self, now: bool = False) -> None: """Shutdown all kernels.""" kids = self.list_kernel_ids() kids += list(self._pending_kernels) - futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)] + kms = list(self._kernels.values()) + futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)] await asyncio.gather(*futs) - # When using "shutdown all", all pending kernels - # should be awaited before exiting this method. + # If using pending kernels, the kernels will not have been fully shut down. if self._using_pending_kernels(): - for km in self._kernels.values(): - await km.ready + for km in kms: + try: + await km.ready + except asyncio.CancelledError: + self._pending_kernels[km.kernel_id].cancel() + except Exception: + # Will have been logged in _add_kernel_when_ready + pass shutdown_all = run_sync(_async_shutdown_all) diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index f71d74209..e1198597b 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -194,6 +194,7 @@ async def test_async_signal_kernel_subprocesses(self, name, install, expected): class TestKernelManager: def test_lifecycle(self, km): km.start_kernel(stdout=PIPE, stderr=PIPE) + kc = km.client() assert km.is_alive() is_done = km.ready.done() assert is_done @@ -201,6 +202,7 @@ def test_lifecycle(self, km): assert km.is_alive() km.interrupt_kernel() assert isinstance(km, KernelManager) + kc.stop_channels() km.shutdown_kernel(now=True) assert km.context.closed diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index bf24496e0..0437e5349 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -88,9 +88,11 @@ def _run_lifecycle(km, test_kid=None): assert kid in km.list_kernel_ids() km.interrupt_kernel(kid) k = km.get_kernel(kid) + kc = k.client() assert isinstance(k, KernelManager) km.shutdown_kernel(kid, now=True) assert kid not in km, f"{kid} not in {km}" + kc.stop_channels() def _run_cinfo(self, km, transport, ip): kid = km.start_kernel(stdout=PIPE, stderr=PIPE) @@ -158,8 +160,10 @@ def test_start_sequence_ipc_kernels(self): def tcp_lifecycle_with_loop(self): # Ensure each thread has an event loop - asyncio.set_event_loop(asyncio.new_event_loop()) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) self.test_tcp_lifecycle() + loop.close() def test_start_parallel_thread_kernels(self): self.test_tcp_lifecycle() @@ -415,10 +419,6 @@ async def test_use_pending_kernels_early_shutdown(self): kernel = km.get_kernel(kid) assert not kernel.ready.done() # Try shutting down while the kernel is pending - with pytest.raises(RuntimeError): - await ensure_future(km.shutdown_kernel(kid, now=True)) - await kernel.ready - # Shutdown once the kernel is ready await ensure_future(km.shutdown_kernel(kid, now=True)) # Wait for the kernel to shutdown await kernel.ready @@ -476,6 +476,7 @@ def tcp_lifecycle_with_loop(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.raw_tcp_lifecycle()) + loop.close() # static so picklable for multiprocessing on Windows @classmethod @@ -491,6 +492,7 @@ def raw_tcp_lifecycle_sync(cls, test_kid=None): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(cls.raw_tcp_lifecycle(test_kid=test_kid)) + loop.close() @gen_test async def test_start_parallel_thread_kernels(self): diff --git a/jupyter_client/threaded.py b/jupyter_client/threaded.py index 46de54154..af7d64540 100644 --- a/jupyter_client/threaded.py +++ b/jupyter_client/threaded.py @@ -243,6 +243,9 @@ def stop(self) -> None: self.close() self.ioloop = None + def __del__(self): + self.close() + def close(self) -> None: if self.ioloop is not None: try: diff --git a/jupyter_client/utils.py b/jupyter_client/utils.py index 9dea2bc2e..585bf1b17 100644 --- a/jupyter_client/utils.py +++ b/jupyter_client/utils.py @@ -13,8 +13,12 @@ def wrapped(*args, **kwargs): try: loop = asyncio.get_running_loop() except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + # Workaround for bugs.python.org/issue39529. + try: + loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) import nest_asyncio # type: ignore nest_asyncio.apply(loop) diff --git a/pyproject.toml b/pyproject.toml index 597c47544..24e16a3db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ filterwarnings= [ # Fail on warnings "error", - # Workarounds for https://github.com/pytest-dev/pytest-asyncio/issues/77 + # We need to handle properly closing loops as part of https://github.com/jupyter/jupyter_client/issues/755. "ignore:unclosed