diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 08c20a2913..4fdffb2816 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -125,6 +125,7 @@ jobs: pip check - name: Run the tests run: | + pip install jupyter_client@https://github.com/blink1073/jupyter_client/archive/refs/heads/synchronous_managers.zip pytest -vv || pytest -vv --lf make_sdist: diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py index 30e3da329f..91ce03826e 100644 --- a/jupyter_server/services/kernels/handlers.py +++ b/jupyter_server/services/kernels/handlers.py @@ -108,6 +108,15 @@ async def post(self, kernel_id, action): self.finish() +def _ensure_future(f): + """Wrap a concurrent future as an asyncio future if there is a running loop.""" + try: + asyncio.get_running_loop() + return asyncio.wrap_future(f) + except RuntimeError: + return f + + class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): """There is one ZMQChannelsHandler per running kernel and it oversees all the sessions. @@ -186,7 +195,7 @@ def nudge(self): self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id) f: Future = Future() f.set_result(None) - return f + return _ensure_future(f) # Use a transient shell channel to prevent leaking # shell responses to the front-end. shell_channel = kernel.connect_shell() @@ -287,7 +296,7 @@ def nudge(count): future = gen.with_timeout(loop.time() + self.kernel_info_timeout, both_done) # ensure we have no dangling resources or unresolved Futures in case of timeout future.add_done_callback(finish) - return future + return _ensure_future(future) def request_kernel_info(self): """send a request for kernel_info""" @@ -311,7 +320,7 @@ def request_kernel_info(self): if not future.done(): self.log.debug("Waiting for pending kernel_info request") future.add_done_callback(lambda f: self._finish_kernel_info(f.result())) - return self._kernel_info_future + return _ensure_future(self._kernel_info_future) def _handle_kernel_info_reply(self, msg): """process the kernel_info_reply @@ -704,7 +713,7 @@ def _limit_rate(self, channel, msg, msg_list): def close(self): super().close() - return self._close_future + return _ensure_future(self._close_future) def on_close(self): self.log.debug("Websocket closed %s", self.session_key)