Skip to content

Commit

Permalink
Merge pull request #832 from Zsailer/jupyter_events
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 authored Sep 13, 2022
2 parents ff85055 + d66b50e commit a6db20d
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 11 deletions.
5 changes: 5 additions & 0 deletions jupyter_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Client-side implementations of the Jupyter protocol"""
import pathlib

from ._version import __version__ # noqa
from ._version import protocol_version # noqa
from ._version import protocol_version_info # noqa
from ._version import version_info # noqa

JUPYTER_CLIENT_EVENTS_URI = "https://events.jupyter.org/jupyter_client"
DEFAULT_EVENTS_SCHEMA_PATH = pathlib.Path(__file__).parent / "event_schemas"

try:
from .asynchronous import AsyncKernelClient # noqa
from .blocking import BlockingKernelClient # noqa
Expand Down
36 changes: 36 additions & 0 deletions jupyter_client/event_schemas/kernel_manager/v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"$id": https://events.jupyter.org/jupyter_client/kernel_manager/v1
version: 1
title: Kernel Manager Events
description: |
Record actions on kernels by the KernelManager.
type: object
required:
- kernel_id
- action
properties:
kernel_id:
oneOf:
- type: string
- type: "null"
description: The kernel's unique ID.
action:
enum:
- pre_start
- launch
- post_start
- interrupt
- restart
- kill
- request_shutdown
- finish_shutdown
- cleanup_resources
- restart_started
- restart_finished
- shutdown_started
- shutdown_finished
description: |
Action performed by the KernelManager API.
caller:
type: string
enum:
- kernel_manager
36 changes: 36 additions & 0 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from enum import Enum

import zmq
from jupyter_events import EventLogger # type: ignore[import]
from traitlets import Any
from traitlets import Bool
from traitlets import default
Expand All @@ -33,6 +34,8 @@
from .provisioning import KernelProvisionerFactory as KPF
from .utils import ensure_async
from .utils import run_sync
from jupyter_client import DEFAULT_EVENTS_SCHEMA_PATH
from jupyter_client import JUPYTER_CLIENT_EVENTS_URI
from jupyter_client import KernelClient
from jupyter_client import kernelspec

Expand Down Expand Up @@ -91,6 +94,27 @@ class KernelManager(ConnectionFileMixin):
This version starts kernels with Popen.
"""

event_schema_id = JUPYTER_CLIENT_EVENTS_URI + "/kernel_manager/v1"
event_logger = Instance(EventLogger).tag(config=True)

@default("event_logger")
def _default_event_logger(self):
if self.parent and hasattr(self.parent, "event_logger"):
return self.parent.event_logger
else:
# If parent does not have an event logger, create one.
logger = EventLogger()
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml"
logger.register_event_schema(schema_path)
return logger

def _emit(self, *, action: str) -> None:
"""Emit event using the core event schema from Jupyter Server's Contents Manager."""
self.event_logger.emit(
schema_id=self.event_schema_id,
data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"},
)

_ready: t.Union[Future, CFuture]

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -308,6 +332,7 @@ async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> No
assert self.provisioner.has_process
# Provisioner provides the connection information. Load into kernel manager and write file.
self._force_connection_info(connection_info)
self._emit(action="launch")

_launch_kernel = run_sync(_async_launch_kernel)

Expand Down Expand Up @@ -350,6 +375,7 @@ async def _async_pre_start_kernel(
)
kw = await self.provisioner.pre_launch(**kw)
kernel_cmd = kw.pop('cmd')
self._emit(action="pre_start")
return kernel_cmd, kw

pre_start_kernel = run_sync(_async_pre_start_kernel)
Expand All @@ -366,6 +392,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
self._connect_control_socket()
assert self.provisioner is not None
await self.provisioner.post_launch(**kw)
self._emit(action="post_start")

post_start_kernel = run_sync(_async_post_start_kernel)

Expand Down Expand Up @@ -401,6 +428,7 @@ async def _async_request_shutdown(self, restart: bool = False) -> None:
assert self.provisioner is not None
await self.provisioner.shutdown_requested(restart=restart)
self._shutdown_status = _ShutdownStatus.ShutdownRequest
self._emit(action="request_shutdown")

request_shutdown = run_sync(_async_request_shutdown)

Expand Down Expand Up @@ -442,6 +470,7 @@ async def _async_finish_shutdown(
if self.has_kernel:
assert self.provisioner is not None
await self.provisioner.wait()
self._emit(action="finish_shutdown")

finish_shutdown = run_sync(_async_finish_shutdown)

Expand All @@ -459,6 +488,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:

if self.provisioner:
await self.provisioner.cleanup(restart=restart)
self._emit(action="cleanup_resources")

cleanup_resources = run_sync(_async_cleanup_resources)

Expand All @@ -481,6 +511,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
self._emit(action="shutdown_started")
self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand All @@ -498,6 +529,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
await ensure_async(self.finish_shutdown(restart=restart))

await ensure_async(self.cleanup_resources(restart=restart))
self._emit(action="shutdown_finished")

shutdown_kernel = run_sync(_async_shutdown_kernel)

Expand Down Expand Up @@ -528,6 +560,7 @@ async def _async_restart_kernel(
Any options specified here will overwrite those used to launch the
kernel.
"""
self._emit(action="restart_started")
if self._launch_args is None:
raise RuntimeError("Cannot restart the kernel. No previous call to 'start_kernel'.")

Expand All @@ -540,6 +573,7 @@ async def _async_restart_kernel(
# Start new kernel.
self._launch_args.update(kw)
await ensure_async(self.start_kernel(**self._launch_args))
self._emit(action="restart_finished")

restart_kernel = run_sync(_async_restart_kernel)

Expand Down Expand Up @@ -576,6 +610,7 @@ async def _async_kill_kernel(self, restart: bool = False) -> None:
# Process is no longer alive, wait and clear
if self.has_kernel:
await self.provisioner.wait()
self._emit(action="kill")

_kill_kernel = run_sync(_async_kill_kernel)

Expand All @@ -597,6 +632,7 @@ async def _async_interrupt_kernel(self) -> None:
self.session.send(self._control_socket, msg)
else:
raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
self._emit(action="interrupt")

interrupt_kernel = run_sync(_async_interrupt_kernel)

Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"pyzmq>=23.0",
"tornado>=6.2",
"traitlets",
"jupyter_events>=0.5.0"
]

[[project.authors]]
Expand All @@ -56,9 +57,10 @@ test = [
"mypy",
"pre-commit",
"pytest",
"pytest-asyncio>=0.18",
"pytest-asyncio>=0.19",
"pytest-cov",
"pytest-timeout",
"jupyter_events[test]"
]
doc = [
"ipykernel",
Expand Down
3 changes: 3 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
pjoin = os.path.join


pytest_plugins = ["jupyter_events.pytest_plugin"]


# Handle resource limit
# Ensure a minimal soft limit of DEFAULT_SOFT if the current hard limit is at least that much.
if resource is not None:
Expand Down
62 changes: 52 additions & 10 deletions tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .utils import AsyncKMSubclass
from .utils import SyncKMSubclass
from jupyter_client import AsyncKernelManager
from jupyter_client import DEFAULT_EVENTS_SCHEMA_PATH
from jupyter_client import KernelManager
from jupyter_client.manager import _ShutdownStatus
from jupyter_client.manager import start_new_async_kernel
Expand Down Expand Up @@ -92,14 +93,14 @@ def start_kernel():


@pytest.fixture
def km(config):
km = KernelManager(config=config)
def km(config, jp_event_logger):
km = KernelManager(config=config, event_logger=jp_event_logger)
return km


@pytest.fixture
def km_subclass(config):
km = SyncKMSubclass(config=config)
def km_subclass(config, jp_event_logger):
km = SyncKMSubclass(config=config, event_logger=jp_event_logger)
return km


Expand All @@ -112,15 +113,36 @@ def zmq_context():
ctx.term()


@pytest.fixture
def jp_event_schemas():
return [DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml"]


@pytest.fixture
def check_emitted_events(jp_read_emitted_events):
"""Check the given events where emitted"""

def _(*expected_list):
read_events = jp_read_emitted_events()
events = [e for e in read_events if e["caller"] == "kernel_manager"]
# Ensure that the number of read events match the expected events.
assert len(events) == len(expected_list)
# Loop through the events and make sure they are in order of expected.
for i, action in enumerate(expected_list):
assert "action" in events[i] and action == events[i]["action"]

return _


@pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass])
def async_km(request, config):
km = request.param(config=config)
def async_km(request, config, jp_event_logger):
km = request.param(config=config, event_logger=jp_event_logger)
return km


@pytest.fixture
def async_km_subclass(config):
km = AsyncKMSubclass(config=config)
def async_km_subclass(config, jp_event_logger):
km = AsyncKMSubclass(config=config, event_logger=jp_event_logger)
return km


Expand Down Expand Up @@ -193,18 +215,35 @@ async def test_async_signal_kernel_subprocesses(self, name, install, expected):


class TestKernelManager:
def test_lifecycle(self, km):
def test_lifecycle(self, km, jp_read_emitted_events, check_emitted_events):
km.start_kernel(stdout=PIPE, stderr=PIPE)
check_emitted_events("pre_start", "launch", "post_start")
kc = km.client()
assert km.is_alive()
is_done = km.ready.done()
assert is_done
km.restart_kernel(now=True)
check_emitted_events(
"restart_started",
"shutdown_started",
"interrupt",
"kill",
"cleanup_resources",
"shutdown_finished",
"pre_start",
"launch",
"post_start",
"restart_finished",
)
assert km.is_alive()
km.interrupt_kernel()
check_emitted_events("interrupt")
assert isinstance(km, KernelManager)
kc.stop_channels()
km.shutdown_kernel(now=True)
check_emitted_events(
"shutdown_started", "interrupt", "kill", "cleanup_resources", "shutdown_finished"
)
assert km.context.closed

def test_get_connect_info(self, km):
Expand Down Expand Up @@ -448,7 +487,10 @@ def execute(cmd):

@pytest.mark.asyncio
class TestAsyncKernelManager:
async def test_lifecycle(self, async_km):
async def test_lifecycle(
self,
async_km,
):
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
Expand Down
10 changes: 10 additions & 0 deletions tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ def test_connection_file_real_path():
km._launch_args = {}
cmds = km.format_kernel_cmd()
assert cmds[4] == "foobar"


def test_kernel_manager_event_logger(jp_event_handler, jp_read_emitted_events):
action = "pre_start"
km = KernelManager()
km.event_logger.register_handler(jp_event_handler)
km._emit(action=action)
output = jp_read_emitted_events()[0]
assert "kernel_id" in output and output["kernel_id"] is None
assert "action" in output and output["action"] == action

0 comments on commit a6db20d

Please sign in to comment.