Skip to content

Commit

Permalink
Basic Event Bus (#820)
Browse files Browse the repository at this point in the history
Co-authored-by: Afshin Taylor Darian <[email protected]>
  • Loading branch information
Zsailer and afshin authored May 20, 2022
1 parent d376780 commit 1361a37
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 0 deletions.
4 changes: 4 additions & 0 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ def kernel_spec_manager(self):
def config_manager(self):
return self.settings["config_manager"]

@property
def event_bus(self):
return self.settings["event_bus"]

# ---------------------------------------------------------------
# CORS
# ---------------------------------------------------------------
Expand Down
22 changes: 22 additions & 0 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
AsyncContentsManager,
ContentsManager,
)
from jupyter_server.services.events.bus import EventBus
from jupyter_server.services.kernels.kernelmanager import (
AsyncMappingKernelManager,
MappingKernelManager,
Expand Down Expand Up @@ -164,6 +165,7 @@
sessions=["jupyter_server.services.sessions.handlers"],
shutdown=["jupyter_server.services.shutdown"],
view=["jupyter_server.view.handlers"],
events=["jupyter_server.services.events.handlers"],
)

# Added for backwards compatibility from classic notebook server.
Expand Down Expand Up @@ -207,6 +209,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -242,6 +245,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
extra_services,
log,
base_url,
Expand All @@ -263,6 +267,7 @@ def init_settings(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -354,6 +359,7 @@ def init_settings(
config_manager=config_manager,
authorizer=authorizer,
identity_provider=identity_provider,
event_bus=event_bus,
# handlers
extra_services=extra_services,
# Jupyter stuff
Expand Down Expand Up @@ -769,6 +775,7 @@ class ServerApp(JupyterApp):
GatewaySessionManager,
GatewayClient,
Authorizer,
EventBus,
]

subcommands = dict(
Expand All @@ -794,6 +801,7 @@ class ServerApp(JupyterApp):
"sessions",
"shutdown",
"view",
"events",
)

_log_formatter_cls = LogFormatter
Expand Down Expand Up @@ -1561,6 +1569,12 @@ def _default_kernel_spec_manager_class(self):
),
)

event_bus = Instance(
EventBus,
allow_none=True,
help="An EventBus for emitting structured event data from Jupyter Server and extensions.",
)

info_file = Unicode()

@default("info_file")
Expand Down Expand Up @@ -1906,6 +1920,10 @@ def init_logging(self):
logger.parent = self.log
logger.setLevel(self.log.level)

def init_eventbus(self):
"""Initialize the Event Bus."""
self.event_bus = EventBus.instance(parent=self)

def init_webapp(self):
"""initialize tornado webapp"""
self.tornado_settings["allow_origin"] = self.allow_origin
Expand Down Expand Up @@ -1970,6 +1988,7 @@ def init_webapp(self):
self.session_manager,
self.kernel_spec_manager,
self.config_manager,
self.event_bus,
self.extra_services,
self.log,
self.base_url,
Expand Down Expand Up @@ -2436,6 +2455,7 @@ def initialize(
if find_extensions:
self.find_server_extensions()
self.init_logging()
self.init_eventbus()
self.init_server_extensions()

# Special case the starter extension and load
Expand Down Expand Up @@ -2762,6 +2782,8 @@ async def _cleanup(self):
await self.cleanup_kernels()
if getattr(self, "session_manager", None):
self.session_manager.close()
if getattr(self, "event_bus", None):
self.event_bus.clear_instance()

def start_ioloop(self):
"""Start the IO Loop."""
Expand Down
Empty file.
15 changes: 15 additions & 0 deletions jupyter_server/services/events/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""An EventBus for use in the Jupyter server.
.. versionadded:: 2.0
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from jupyter_telemetry.eventlog import EventLog
from traitlets.config import SingletonConfigurable


class EventBus(EventLog, SingletonConfigurable):
"""A singleton eventlog that behaves as an event
bus for emitting Jupyter Server (and extension)
event data.
"""
80 changes: 80 additions & 0 deletions jupyter_server/services/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""A Websocket Handler for emitting Jupyter server events.
.. versionadded:: 2.0
"""
import logging

from jupyter_telemetry.eventlog import _skip_message
from pythonjsonlogger import jsonlogger
from tornado import web, websocket

from jupyter_server.base.handlers import JupyterHandler

AUTH_RESOURCE = "events"


class WebSocketLoggingHandler(logging.Handler):
"""Python logging handler that routes records to a Tornado websocket."""

def __init__(self, websocket, *args, **kwargs):
super().__init__(*args, **kwargs)
self.websocket = websocket

def emit(self, record):
"""Emit the message across the websocket"""
self.websocket.write_message(record.msg)


class SubscribeWebsocket(
JupyterHandler,
websocket.WebSocketHandler,
):
"""Websocket handler for subscribing to events"""

auth_resource = AUTH_RESOURCE

def pre_get(self):
"""Handles authentication/authorization when
attempting to subscribe to events emitted by
Jupyter Server's eventbus.
"""
# authenticate the request before opening the websocket
user = self.current_user
if user is None:
self.log.warning("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)

# authorize the user.
if not self.authorizer.is_authorized(self, user, "execute", "events"):
raise web.HTTPError(403)

async def get(self, *args, **kwargs):
self.pre_get()
res = super().get(*args, **kwargs)
await res

@property
def event_bus(self):
"""Jupyter Server's event bus that emits structured event data."""
return self.settings["event_bus"]

def open(self):
"""Routes events that are emitted by Jupyter Server's
EventBus to a WebSocket client in the browser.
"""
self.logging_handler = WebSocketLoggingHandler(self)
# Add a JSON formatter to the handler.
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
self.logging_handler.setFormatter(formatter)
# To do: add an eventlog.add_handler method to jupyter_telemetry.
self.event_bus.log.addHandler(self.logging_handler)
self.event_bus.handlers.append(self.logging_handler)

def on_close(self):
self.event_bus.log.removeHandler(self.logging_handler)
self.event_bus.handlers.remove(self.logging_handler)


default_handlers = [
(r"/api/events/subscribe", SubscribeWebsocket),
]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"tornado>=6.1.0",
"traitlets>=5.1",
"websocket-client",
"jupyter_telemetry"
]

[project.readme]
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions tests/services/events/mock_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
$id: event.mock.jupyter.com/message
version: 1
title: Message
description: |
Emit a message
type: object
properties:
event_message:
title: Event Messages
description: |
Mock event message to read.
required:
- event_message
10 changes: 10 additions & 0 deletions tests/services/events/mockextension/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .mock_extension import _load_jupyter_server_extension # noqa: F401

# Function that makes these extensions discoverable
# by the test functions.


def _jupyter_server_extension_points():
return [
{"module": "tests.services.events.mockextension"},
]
23 changes: 23 additions & 0 deletions tests/services/events/mockextension/mock_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pathlib

from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.utils import url_path_join


class MockEventHandler(JupyterHandler):
def get(self):
# Emit an event.
self.event_bus.record_event(
schema_name="event.mockextension.jupyter.com/message",
version=1,
event={"event_message": "Hello world, from mock extension!"},
)


def _load_jupyter_server_extension(serverapp):
# Register a schema with the EventBus
schema_file = pathlib.Path(__file__).parent / "mock_extension_event.yaml"
serverapp.event_bus.register_schema_file(schema_file)
serverapp.web_app.add_handlers(
".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)]
)
13 changes: 13 additions & 0 deletions tests/services/events/mockextension/mock_extension_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
$id: event.mockextension.jupyter.com/message
version: 1
title: Message
description: |
Emit a message
type: object
properties:
event_message:
title: Event Message
description: |
Mock event message to read.
required:
- event_message
32 changes: 32 additions & 0 deletions tests/services/events/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json
import pathlib

import pytest


@pytest.fixture
def event_bus(jp_serverapp):
event_bus = jp_serverapp.event_bus
# Register the event schema defined in this directory.
schema_file = pathlib.Path(__file__).parent / "mock_event.yaml"
event_bus.register_schema_file(schema_file)
#
event_bus.allowed_schemas = ["event.mock.jupyter.com/message"]
return event_bus


async def test_subscribe_websocket(jp_ws_fetch, event_bus):
# Open a websocket connection.
ws = await jp_ws_fetch("/api/events/subscribe")

event_bus.record_event(
schema_name="event.mock.jupyter.com/message",
version=1,
event={"event_message": "Hello, world!"},
)
message = await ws.read_message()
event_data = json.loads(message)
# Close websocket
ws.close()

assert event_data.get("event_message") == "Hello, world!"
32 changes: 32 additions & 0 deletions tests/services/events/test_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json

import pytest


@pytest.fixture
def jp_server_config():
config = {
"ServerApp": {
"jpserver_extensions": {"tests.services.events.mockextension": True},
},
"EventBus": {"allowed_schemas": ["event.mockextension.jupyter.com/message"]},
}
return config


async def test_subscribe_websocket(jp_ws_fetch, jp_fetch):
# Open an event listener websocket
ws = await jp_ws_fetch("/api/events/subscribe")

# Hit the extension endpoint that emits an event
await jp_fetch("/mock/event")

# Check the event listener for a message
message = await ws.read_message()
event_data = json.loads(message)

# Close websocket
ws.close()

# Verify that an event message was received.
assert event_data.get("event_message") == "Hello world, from mock extension!"

0 comments on commit 1361a37

Please sign in to comment.