From 1361a372405d1be4b78ee7c318c210ca39d7cfb4 Mon Sep 17 00:00:00 2001 From: Zachary Sailer Date: Fri, 20 May 2022 03:53:33 -0700 Subject: [PATCH] Basic Event Bus (#820) Co-authored-by: Afshin Taylor Darian --- jupyter_server/base/handlers.py | 4 + jupyter_server/serverapp.py | 22 +++++ jupyter_server/services/events/__init__.py | 0 jupyter_server/services/events/bus.py | 15 ++++ jupyter_server/services/events/handlers.py | 80 +++++++++++++++++++ pyproject.toml | 1 + tests/services/events/__init__.py | 0 tests/services/events/mock_event.yaml | 13 +++ .../services/events/mockextension/__init__.py | 10 +++ .../events/mockextension/mock_extension.py | 23 ++++++ .../mockextension/mock_extension_event.yaml | 13 +++ tests/services/events/test_api.py | 32 ++++++++ tests/services/events/test_extension.py | 32 ++++++++ 13 files changed, 245 insertions(+) create mode 100644 jupyter_server/services/events/__init__.py create mode 100644 jupyter_server/services/events/bus.py create mode 100644 jupyter_server/services/events/handlers.py create mode 100644 tests/services/events/__init__.py create mode 100644 tests/services/events/mock_event.yaml create mode 100644 tests/services/events/mockextension/__init__.py create mode 100644 tests/services/events/mockextension/mock_extension.py create mode 100644 tests/services/events/mockextension/mock_extension_event.yaml create mode 100644 tests/services/events/test_api.py create mode 100644 tests/services/events/test_extension.py diff --git a/jupyter_server/base/handlers.py b/jupyter_server/base/handlers.py index f0c80173d7..62104a80bb 100644 --- a/jupyter_server/base/handlers.py +++ b/jupyter_server/base/handlers.py @@ -340,6 +340,10 @@ def kernel_spec_manager(self): def config_manager(self): return self.settings["config_manager"] + @property + def event_bus(self): + return self.settings["event_bus"] + # --------------------------------------------------------------- # CORS # --------------------------------------------------------------- diff --git a/jupyter_server/serverapp.py b/jupyter_server/serverapp.py index 57dd76a365..a5f9b99d10 100644 --- a/jupyter_server/serverapp.py +++ b/jupyter_server/serverapp.py @@ -120,6 +120,7 @@ AsyncContentsManager, ContentsManager, ) +from jupyter_server.services.events.bus import EventBus from jupyter_server.services.kernels.kernelmanager import ( AsyncMappingKernelManager, MappingKernelManager, @@ -164,6 +165,7 @@ sessions=["jupyter_server.services.sessions.handlers"], shutdown=["jupyter_server.services.shutdown"], view=["jupyter_server.view.handlers"], + events=["jupyter_server.services.events.handlers"], ) # Added for backwards compatibility from classic notebook server. @@ -207,6 +209,7 @@ def __init__( session_manager, kernel_spec_manager, config_manager, + event_bus, extra_services, log, base_url, @@ -242,6 +245,7 @@ def __init__( session_manager, kernel_spec_manager, config_manager, + event_bus, extra_services, log, base_url, @@ -263,6 +267,7 @@ def init_settings( session_manager, kernel_spec_manager, config_manager, + event_bus, extra_services, log, base_url, @@ -354,6 +359,7 @@ def init_settings( config_manager=config_manager, authorizer=authorizer, identity_provider=identity_provider, + event_bus=event_bus, # handlers extra_services=extra_services, # Jupyter stuff @@ -769,6 +775,7 @@ class ServerApp(JupyterApp): GatewaySessionManager, GatewayClient, Authorizer, + EventBus, ] subcommands = dict( @@ -794,6 +801,7 @@ class ServerApp(JupyterApp): "sessions", "shutdown", "view", + "events", ) _log_formatter_cls = LogFormatter @@ -1561,6 +1569,12 @@ def _default_kernel_spec_manager_class(self): ), ) + event_bus = Instance( + EventBus, + allow_none=True, + help="An EventBus for emitting structured event data from Jupyter Server and extensions.", + ) + info_file = Unicode() @default("info_file") @@ -1906,6 +1920,10 @@ def init_logging(self): logger.parent = self.log logger.setLevel(self.log.level) + def init_eventbus(self): + """Initialize the Event Bus.""" + self.event_bus = EventBus.instance(parent=self) + def init_webapp(self): """initialize tornado webapp""" self.tornado_settings["allow_origin"] = self.allow_origin @@ -1970,6 +1988,7 @@ def init_webapp(self): self.session_manager, self.kernel_spec_manager, self.config_manager, + self.event_bus, self.extra_services, self.log, self.base_url, @@ -2436,6 +2455,7 @@ def initialize( if find_extensions: self.find_server_extensions() self.init_logging() + self.init_eventbus() self.init_server_extensions() # Special case the starter extension and load @@ -2762,6 +2782,8 @@ async def _cleanup(self): await self.cleanup_kernels() if getattr(self, "session_manager", None): self.session_manager.close() + if getattr(self, "event_bus", None): + self.event_bus.clear_instance() def start_ioloop(self): """Start the IO Loop.""" diff --git a/jupyter_server/services/events/__init__.py b/jupyter_server/services/events/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/jupyter_server/services/events/bus.py b/jupyter_server/services/events/bus.py new file mode 100644 index 0000000000..0cbdbf07e9 --- /dev/null +++ b/jupyter_server/services/events/bus.py @@ -0,0 +1,15 @@ +"""An EventBus for use in the Jupyter server. + +.. versionadded:: 2.0 +""" +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +from jupyter_telemetry.eventlog import EventLog +from traitlets.config import SingletonConfigurable + + +class EventBus(EventLog, SingletonConfigurable): + """A singleton eventlog that behaves as an event + bus for emitting Jupyter Server (and extension) + event data. + """ diff --git a/jupyter_server/services/events/handlers.py b/jupyter_server/services/events/handlers.py new file mode 100644 index 0000000000..f861f4227d --- /dev/null +++ b/jupyter_server/services/events/handlers.py @@ -0,0 +1,80 @@ +"""A Websocket Handler for emitting Jupyter server events. + +.. versionadded:: 2.0 +""" +import logging + +from jupyter_telemetry.eventlog import _skip_message +from pythonjsonlogger import jsonlogger +from tornado import web, websocket + +from jupyter_server.base.handlers import JupyterHandler + +AUTH_RESOURCE = "events" + + +class WebSocketLoggingHandler(logging.Handler): + """Python logging handler that routes records to a Tornado websocket.""" + + def __init__(self, websocket, *args, **kwargs): + super().__init__(*args, **kwargs) + self.websocket = websocket + + def emit(self, record): + """Emit the message across the websocket""" + self.websocket.write_message(record.msg) + + +class SubscribeWebsocket( + JupyterHandler, + websocket.WebSocketHandler, +): + """Websocket handler for subscribing to events""" + + auth_resource = AUTH_RESOURCE + + def pre_get(self): + """Handles authentication/authorization when + attempting to subscribe to events emitted by + Jupyter Server's eventbus. + """ + # authenticate the request before opening the websocket + user = self.current_user + if user is None: + self.log.warning("Couldn't authenticate WebSocket connection") + raise web.HTTPError(403) + + # authorize the user. + if not self.authorizer.is_authorized(self, user, "execute", "events"): + raise web.HTTPError(403) + + async def get(self, *args, **kwargs): + self.pre_get() + res = super().get(*args, **kwargs) + await res + + @property + def event_bus(self): + """Jupyter Server's event bus that emits structured event data.""" + return self.settings["event_bus"] + + def open(self): + """Routes events that are emitted by Jupyter Server's + EventBus to a WebSocket client in the browser. + """ + self.logging_handler = WebSocketLoggingHandler(self) + # Add a JSON formatter to the handler. + formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message) + self.logging_handler.setFormatter(formatter) + # To do: add an eventlog.add_handler method to jupyter_telemetry. + self.event_bus.log.addHandler(self.logging_handler) + self.event_bus.handlers.append(self.logging_handler) + + def on_close(self): + self.event_bus.log.removeHandler(self.logging_handler) + self.event_bus.handlers.remove(self.logging_handler) + + +default_handlers = [ + (r"/api/events/subscribe", SubscribeWebsocket), +] diff --git a/pyproject.toml b/pyproject.toml index 30da9d4446..104ff3a5b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "tornado>=6.1.0", "traitlets>=5.1", "websocket-client", + "jupyter_telemetry" ] [project.readme] diff --git a/tests/services/events/__init__.py b/tests/services/events/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/services/events/mock_event.yaml b/tests/services/events/mock_event.yaml new file mode 100644 index 0000000000..c25570e633 --- /dev/null +++ b/tests/services/events/mock_event.yaml @@ -0,0 +1,13 @@ +$id: event.mock.jupyter.com/message +version: 1 +title: Message +description: | + Emit a message +type: object +properties: + event_message: + title: Event Messages + description: | + Mock event message to read. +required: + - event_message diff --git a/tests/services/events/mockextension/__init__.py b/tests/services/events/mockextension/__init__.py new file mode 100644 index 0000000000..b19cb18a2e --- /dev/null +++ b/tests/services/events/mockextension/__init__.py @@ -0,0 +1,10 @@ +from .mock_extension import _load_jupyter_server_extension # noqa: F401 + +# Function that makes these extensions discoverable +# by the test functions. + + +def _jupyter_server_extension_points(): + return [ + {"module": "tests.services.events.mockextension"}, + ] diff --git a/tests/services/events/mockextension/mock_extension.py b/tests/services/events/mockextension/mock_extension.py new file mode 100644 index 0000000000..9d46554063 --- /dev/null +++ b/tests/services/events/mockextension/mock_extension.py @@ -0,0 +1,23 @@ +import pathlib + +from jupyter_server.base.handlers import JupyterHandler +from jupyter_server.utils import url_path_join + + +class MockEventHandler(JupyterHandler): + def get(self): + # Emit an event. + self.event_bus.record_event( + schema_name="event.mockextension.jupyter.com/message", + version=1, + event={"event_message": "Hello world, from mock extension!"}, + ) + + +def _load_jupyter_server_extension(serverapp): + # Register a schema with the EventBus + schema_file = pathlib.Path(__file__).parent / "mock_extension_event.yaml" + serverapp.event_bus.register_schema_file(schema_file) + serverapp.web_app.add_handlers( + ".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)] + ) diff --git a/tests/services/events/mockextension/mock_extension_event.yaml b/tests/services/events/mockextension/mock_extension_event.yaml new file mode 100644 index 0000000000..8a131b4151 --- /dev/null +++ b/tests/services/events/mockextension/mock_extension_event.yaml @@ -0,0 +1,13 @@ +$id: event.mockextension.jupyter.com/message +version: 1 +title: Message +description: | + Emit a message +type: object +properties: + event_message: + title: Event Message + description: | + Mock event message to read. +required: + - event_message diff --git a/tests/services/events/test_api.py b/tests/services/events/test_api.py new file mode 100644 index 0000000000..fe3aa32ead --- /dev/null +++ b/tests/services/events/test_api.py @@ -0,0 +1,32 @@ +import json +import pathlib + +import pytest + + +@pytest.fixture +def event_bus(jp_serverapp): + event_bus = jp_serverapp.event_bus + # Register the event schema defined in this directory. + schema_file = pathlib.Path(__file__).parent / "mock_event.yaml" + event_bus.register_schema_file(schema_file) + # + event_bus.allowed_schemas = ["event.mock.jupyter.com/message"] + return event_bus + + +async def test_subscribe_websocket(jp_ws_fetch, event_bus): + # Open a websocket connection. + ws = await jp_ws_fetch("/api/events/subscribe") + + event_bus.record_event( + schema_name="event.mock.jupyter.com/message", + version=1, + event={"event_message": "Hello, world!"}, + ) + message = await ws.read_message() + event_data = json.loads(message) + # Close websocket + ws.close() + + assert event_data.get("event_message") == "Hello, world!" diff --git a/tests/services/events/test_extension.py b/tests/services/events/test_extension.py new file mode 100644 index 0000000000..a47943c636 --- /dev/null +++ b/tests/services/events/test_extension.py @@ -0,0 +1,32 @@ +import json + +import pytest + + +@pytest.fixture +def jp_server_config(): + config = { + "ServerApp": { + "jpserver_extensions": {"tests.services.events.mockextension": True}, + }, + "EventBus": {"allowed_schemas": ["event.mockextension.jupyter.com/message"]}, + } + return config + + +async def test_subscribe_websocket(jp_ws_fetch, jp_fetch): + # Open an event listener websocket + ws = await jp_ws_fetch("/api/events/subscribe") + + # Hit the extension endpoint that emits an event + await jp_fetch("/mock/event") + + # Check the event listener for a message + message = await ws.read_message() + event_data = json.loads(message) + + # Close websocket + ws.close() + + # Verify that an event message was received. + assert event_data.get("event_message") == "Hello world, from mock extension!"