Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle warnings in tests #751

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d7b50a9
Handle warnings in tests
blink1073 Mar 6, 2022
b1484ed
set strict asyncio mode
blink1073 Mar 7, 2022
eaf2195
switch to auto mode
blink1073 Mar 7, 2022
c068a82
fix multikernelmanager tests
blink1073 Mar 7, 2022
7ccc0cf
fix provisioning
blink1073 Mar 7, 2022
4db6e4e
cleanup
blink1073 Mar 7, 2022
8de3e86
fix for min version
blink1073 Mar 7, 2022
b8935af
fix toml
blink1073 Mar 7, 2022
c729b23
try pyzmq 18
blink1073 Mar 7, 2022
40d3890
use bugfix version
blink1073 Mar 7, 2022
afdacfb
do not fail on warnings for min deps
blink1073 Mar 7, 2022
87832a2
fix event loop warning
blink1073 Mar 7, 2022
c3b432a
py10 fixups
blink1073 Mar 7, 2022
841c6bc
remove problematic test
blink1073 Mar 7, 2022
602f361
handle loop warning
blink1073 Mar 7, 2022
8945cc2
fix windows tempdir handling
blink1073 Mar 7, 2022
1edb75f
more cleanup
blink1073 Mar 7, 2022
2db6359
more cleanup
blink1073 Mar 7, 2022
77f5e4b
use latest nest-asyncio
blink1073 Mar 7, 2022
d217f59
try updated tornado
blink1073 Mar 7, 2022
2c506ec
try newer asyncio-test
blink1073 Mar 7, 2022
7a2f6d1
try newer pyzmq
blink1073 Mar 7, 2022
9868fa8
try newer ipykernel
blink1073 Mar 7, 2022
9251cc1
try removing zmq warning guard
blink1073 Mar 7, 2022
457e344
restore future warning
blink1073 Mar 7, 2022
0a75462
address review
blink1073 Mar 7, 2022
b0a321d
bump tornado
blink1073 Mar 7, 2022
ff8e482
bump zmq version
blink1073 Mar 7, 2022
d670d8c
fix version
blink1073 Mar 7, 2022
9ec08a8
relax a couple versions
blink1073 Mar 7, 2022
968b9c7
clean up ci
blink1073 Mar 7, 2022
35645f8
try a newer ipykernel
blink1073 Mar 7, 2022
8c4c472
fix shutdown all behavior
blink1073 Mar 8, 2022
d108c05
wip refactor
blink1073 Mar 12, 2022
66c6dab
more cleanup
blink1073 Mar 12, 2022
40bacf0
fix test
blink1073 Mar 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ concurrency:
jobs:
tests:
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
matrix:
python-version: ["3.9"]
fail-fast: false
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ jobs:
build-n-test-n-coverage:
name: Build, test and code coverage

timeout-minutes: 15

runs-on: ${{ matrix.os }}

strategy:
Expand Down Expand Up @@ -73,7 +75,7 @@ jobs:
run: mypy jupyter_client --exclude '\/tests|kernelspecapp|ioloop|runapp' --install-types --non-interactive

- name: Run the tests with coverage
run: pytest --cov jupyter_client -v jupyter_client
run: pytest --cov jupyter_client -vv jupyter_client || pytest --cov jupyter_client -vv jupyter_client --lf

- name: Build the docs
run: |
Expand All @@ -87,6 +89,7 @@ jobs:
test_miniumum_verisons:
name: Test Minimum Versions
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
- name: Base Setup
Expand All @@ -96,10 +99,11 @@ jobs:
- name: Install miniumum versions
uses: jupyterlab/maintainer-tools/.github/actions/install-minimums@v1
- name: Run the unit tests
run: pytest -vv jupyter_client || pytest -vv jupyter_client --lf
run: pytest -vv jupyter_client -W default || pytest -vv jupyter_client --lf -W default

test_prereleases:
name: Test Prereleases
timeout-minutes: 10
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand Down
10 changes: 10 additions & 0 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import zmq.asyncio
from traitlets import Any # type: ignore
from traitlets import Bool
from traitlets import Instance
from traitlets import Type

Expand Down Expand Up @@ -92,7 +93,10 @@ class KernelClient(ConnectionFileMixin):
# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.asyncio.Context)

_created_context: Bool = Bool(False)

def _context_default(self) -> zmq.asyncio.Context:
self._created_context = True
return zmq.asyncio.Context()

# The classes to use for the various channels
Expand Down Expand Up @@ -282,6 +286,9 @@ def start_channels(
:meth:`start_kernel`. If the channels have been stopped and you
call this, :class:`RuntimeError` will be raised.
"""
# Create the context if needed.
if not self._created_context:
self.context = self._context_default()
if iopub:
self.iopub_channel.start()
if shell:
Expand Down Expand Up @@ -311,6 +318,9 @@ def stop_channels(self) -> None:
self.hb_channel.stop()
if self.control_channel.is_alive():
self.control_channel.stop()
if self._created_context:
self._created_context = False
self.context.destroy()

@property
def channels_running(self) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions jupyter_client/kernelspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

class KernelSpec(HasTraits):
argv = List()
name = Unicode()
mimetype = Unicode()
display_name = Unicode()
language = Unicode()
env = Dict()
Expand Down
68 changes: 42 additions & 26 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,38 @@ class _ShutdownStatus(Enum):
SigkillRequest = "SigkillRequest"


def in_pending_state(method):
def in_pending_state(attr='_ready'):
"""Sets the kernel to a pending state by
creating a fresh Future for the KernelManager's `ready`
attribute. Once the method is finished, set the Future's results.
"""

@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
# Create a future for the decorated method
try:
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()
try:
# call wrapped method, await, and set the result or exception.
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
return out
except Exception as e:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e
def inner(method):
@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
# Create a future for the decorated method
try:
asyncio.get_running_loop()
fut = Future()
except RuntimeError:
# No event loop running, use concurrent future
fut = CFuture()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is inside an async def, can this except branch ever occur? There should always be a running loop from inside a coroutine.

The previous strategy of setting _ready outside a coroutine might not have a running loop, but now that you've moved it into the coroutine (good, I think), there should always be a running loop.

Returning two types of Future depending on context would be tricky because they aren't interchangeable (CFuture is not awaitable without calling asyncio.wrap_future(cfuture)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An important difference of this strategy, though: the attribute will not be set until an arbitrarily-later point in time due to asyncness. The previous strategy ensured ._ready was defined immediately before any waiting, whereas putting it in the coroutine means the attribute will not be defined immediately.

Two new things to consider (may well both be covered):

  1. previous waits for self._ready that may now be called when _ready is not yet
  2. can _ready be set and then re-set? If so, waits for _ready may get a 'stale' state before the new _ready future is attached. This can be avoided with a non-async delattr prior to the async bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally _ready was only defined in the constructor, which is why we had the fallback option. It seems like these futures are becoming problematic in general. Maybe what we really want is a state and a signal when the state changes, like we have on the JavaScript side.

Copy link
Contributor Author

@blink1073 blink1073 Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, if we want to define ready and shutdown_ready as "the most recent" starting and shutting down futures, we could also add an is_ready and is_shutdown_ready for when there are no pending of either. We would use this trick that tornado used to work around the new deprecation warnings:

loop = asyncio.get_event_loop_policy().get_event_loop()   # this always returns the same loop
future = asyncio.Future(loop=loop)

We could then make .ready and .shutdown ready futures only available when using the AsyncKernelManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha oh my, nevermind, I'm digesting https://bugs.python.org/issue39529

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think part of the problem is that jupyter_client is a library, not an application, in the same way that tornado is. It looks like we also can't rely on asyncio_run in the future, since it relies on get_event_loop and set_event_loop. I think here's a sketch of what we need to do:

  • The base classes will be asynchronous and call the the private async versions of functions directly as needed. We will only use futures and tasks from within async methods.
  • The synchronous classes will work by wrapping the asynchronous public methods using a decorator that runs a new private asyncio loop as return asyncio.new_event_loop().run_until_complete(async_method).
  • We remove the .ready future and add new methods called async def wait_for_pending_start() and async def wait_for_pending_shutdown() to handle pending kernels.

With this new structure, the synchronous managers could also support pending kernels, breaking it into two synchronous steps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle the ready future in this PR and defer the other parts to a separate refactor PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, concurrent.futures are generally more compatible and flexible than asyncio futures - they are threadsafe, reusable across loops, etc. If you always use a cf.Future, you can await it in a coroutine with asyncio.wrap_future(cf_future). That may be the simplest solution here. It's the inconsistency that I saw as a problem (self._ready may be awaitable). Something like:

# earlier, _not_ in a coroutine
self._ready = cf.Future()

async def wait_for_ready(self):
    if self._ready is None:
        raise RuntimeError("haven't started yet!")
    # wrap cf.Future self._ready to make it awaitable
    await asyncio.wrap_future(self._ready)

ought to be more reliable.

To always use new event loops (or asyncio.run) for each sync method call, one important consideration is that all your methods completely resolve by the end of the coroutine (i.e. not hold references to the current loop for later calls, which means requiring pyzmq 22.2, among other things). A possibly simpler alternative is to maintain your own event loop reference. asyncio.get_event_loop() is deprecated because it did too many things, but that doesn't mean nobody should hold onto persistent loop references. It just means that they don't want to track a global not-running loop. If each instance had a self._loop = asyncio.new_event_loop() and always ran the sync wrappers in that loop (self._loop.run_until_complete(coro())), it should work fine. That avoids the multiple-loop problem for sync uses because it's still a single persistent loop. Each instance may happen to use a different loop, but that should not be a problem in the sync case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, thanks for the insights Min. That sounds like a good approach.

Copy link
Contributor Author

@blink1073 blink1073 Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More thoughts on ready future handling:

  • Remove the public ready attribute(s), but add public is_start_pending and is_shutdown_pending flags.
  • Internally, store current ready and pending flags for start and shutdown.
  • In start(), check for _start_pending and error if true. Set _start_pending and set a new _start_ready future. Clear _start_pending when ready.
  • In shutdown(), check for _shutdown_pending and bail if true. Set _shutdown_pending and set a new _shutdown_ready future. Clear _shutdown_pending when ready.
  • In wait_for_start_ready() and wait_for_shutdown_ready() store current ready, and wait for it. If new current ready is different from the stored one, recurse.

setattr(self, attr, fut)
try:
# call wrapped method, await, and set the result or exception.
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
fut.set_result(None)
return out
except Exception as e:
fut.set_exception(e)
self.log.exception(fut.exception())
raise e

return wrapper

return wrapper
return inner


class KernelManager(ConnectionFileMixin):
Expand All @@ -90,8 +95,10 @@ class KernelManager(ConnectionFileMixin):
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
self._shutdown_ready = None
# Create a place holder future.
try:
asyncio.get_running_loop()
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
Expand Down Expand Up @@ -182,6 +189,11 @@ def ready(self) -> Future:
"""A future that resolves when the kernel process has started for the first time"""
return self._ready

@property
def shutdown_ready(self) -> Future:
"""A future that resolves when a shutdown has completed"""
return self._shutdown_ready

@property
def ipykernel(self) -> bool:
return self.kernel_name in {"python", "python2", "python3"}
Expand Down Expand Up @@ -360,7 +372,7 @@ async def _async_post_start_kernel(self, **kw) -> None:

post_start_kernel = run_sync(_async_post_start_kernel)

@in_pending_state
@in_pending_state()
async def _async_start_kernel(self, **kw):
"""Starts a kernel on this host in a separate process.

Expand All @@ -387,8 +399,9 @@ async def _async_request_shutdown(self, restart: bool = False) -> None:
content = dict(restart=restart)
msg = self.session.msg("shutdown_request", content=content)
# ensure control socket is connected
self._connect_control_socket()
self.session.send(self._control_socket, msg)
if self._control_socket and not self._control_socket.closed:
self._connect_control_socket()
self.session.send(self._control_socket, msg)
assert self.provisioner is not None
await self.provisioner.shutdown_requested(restart=restart)
self._shutdown_status = _ShutdownStatus.ShutdownRequest
Expand Down Expand Up @@ -453,7 +466,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:

cleanup_resources = run_sync(_async_cleanup_resources)

@in_pending_state
@in_pending_state('_shutdown_ready')
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False):
"""Attempts to stop the kernel process cleanly.

Expand All @@ -476,7 +489,10 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

await ensure_async(self.interrupt_kernel())
# If the kernel has already started, interrupt it to give it a
# chance to clean up.
if self.has_kernel:
await ensure_async(self.interrupt_kernel())
Comment on lines +505 to +506
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice - make this call to interrupt() conditional on start status. Good catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼


if now:
await ensure_async(self._kill_kernel())
Expand Down
2 changes: 1 addition & 1 deletion jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
# should be awaited before exiting this method.
if self._using_pending_kernels():
for km in self._kernels.values():
await km.ready
await km.shutdown_ready

shutdown_all = run_sync(_async_shutdown_all)

Expand Down
5 changes: 5 additions & 0 deletions jupyter_client/provisioning/local_provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ async def wait(self) -> Optional[int]:

# Process is no longer alive, wait and clear
ret = self.process.wait()
# Make sure all the fds get closed.
for attr in ['stdout', 'stderr', 'stdin']:
fid = getattr(self.process, attr)
if fid:
fid.close()
self.process = None # allow has_process to now return False
return ret

Expand Down
4 changes: 1 addition & 3 deletions jupyter_client/ssh/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class SSHException(Exception): # type: ignore
except ImportError:
pexpect = None

from zmq.utils.strtypes import b


def select_random_ports(n):
"""Select and return n random ports that are available."""
Expand All @@ -56,7 +54,7 @@ def select_random_ports(n):
# -----------------------------------------------------------------------------
# Check for passwordless login
# -----------------------------------------------------------------------------
_password_pat = re.compile(b(r"pass(word|phrase):"), re.IGNORECASE)
_password_pat = re.compile(r"pass(word|phrase):".encode("utf8"), re.IGNORECASE)


def try_passwordless_ssh(server, keyfile, paramiko=None):
Expand Down
22 changes: 22 additions & 0 deletions jupyter_client/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,30 @@

from .utils import test_env

try:
import resource
except ImportError:
# Windows
resource = None

pjoin = os.path.join


# Handle resource limit
# Ensure a minimal soft limit of DEFAULT_SOFT if the current hard limit is at least that much.
if resource is not None:
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)

DEFAULT_SOFT = 4096
if hard >= DEFAULT_SOFT:
soft = DEFAULT_SOFT

if hard < soft:
hard = soft

resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))


if os.name == "nt" and sys.version_info >= (3, 7):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Expand All @@ -22,6 +43,7 @@ def event_loop():
if os.name == "nt" and sys.version_info >= (3, 7):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.SelectorEventLoop()

try:
yield loop
finally:
Expand Down
8 changes: 5 additions & 3 deletions jupyter_client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ class TestKernelClient(TestCase):
def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
self.addCleanup(self.env_patch.stop)
try:
KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME)
except NoSuchKernel:
pytest.skip()
self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
self.addCleanup(self.kc.stop_channels)
self.addCleanup(self.km.shutdown_kernel)

def tearDown(self):
self.env_patch.stop()
self.km.shutdown_kernel()
self.kc.stop_channels()

def test_execute_interactive(self):
kc = self.kc
Expand Down
4 changes: 4 additions & 0 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ def execute(cmd):
km.shutdown_kernel()
assert km.context.closed

kc.stop_channels()


@pytest.mark.asyncio
class TestAsyncKernelManager:
Expand Down Expand Up @@ -525,6 +527,8 @@ async def execute(cmd):
# verify that subprocesses were interrupted
assert reply["user_expressions"]["poll"] == [-signal.SIGINT] * N

await km.shutdown_kernel()

@pytest.mark.timeout(10)
async def test_start_new_async_kernel(self, install_kernel, start_async_kernel):
km, kc = start_async_kernel
Expand Down
Loading