Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Event Bus #820

Merged
merged 9 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ def kernel_spec_manager(self):
def config_manager(self):
return self.settings["config_manager"]

@property
def event_bus(self):
return self.settings["event_bus"]

# ---------------------------------------------------------------
# CORS
# ---------------------------------------------------------------
Expand Down
22 changes: 22 additions & 0 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
AsyncContentsManager,
ContentsManager,
)
from jupyter_server.services.events.bus import EventBus
from jupyter_server.services.kernels.kernelmanager import (
AsyncMappingKernelManager,
MappingKernelManager,
Expand Down Expand Up @@ -164,6 +165,7 @@
sessions=["jupyter_server.services.sessions.handlers"],
shutdown=["jupyter_server.services.shutdown"],
view=["jupyter_server.view.handlers"],
events=["jupyter_server.services.events.handlers"],
)

# Added for backwards compatibility from classic notebook server.
Expand Down Expand Up @@ -207,6 +209,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -242,6 +245,7 @@ def __init__(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
extra_services,
log,
base_url,
Expand All @@ -263,6 +267,7 @@ def init_settings(
session_manager,
kernel_spec_manager,
config_manager,
event_bus,
extra_services,
log,
base_url,
Expand Down Expand Up @@ -354,6 +359,7 @@ def init_settings(
config_manager=config_manager,
authorizer=authorizer,
identity_provider=identity_provider,
event_bus=event_bus,
# handlers
extra_services=extra_services,
# Jupyter stuff
Expand Down Expand Up @@ -769,6 +775,7 @@ class ServerApp(JupyterApp):
GatewaySessionManager,
GatewayClient,
Authorizer,
EventBus,
]

subcommands = dict(
Expand All @@ -794,6 +801,7 @@ class ServerApp(JupyterApp):
"sessions",
"shutdown",
"view",
"events",
)

_log_formatter_cls = LogFormatter
Expand Down Expand Up @@ -1561,6 +1569,12 @@ def _default_kernel_spec_manager_class(self):
),
)

event_bus = Instance(
EventBus,
allow_none=True,
help="An EventBus for emitting structured event data from Jupyter Server and extensions.",
)
Comment on lines +1572 to +1576
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event bus is instantiated down on line 1925. What is the purpose of this statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is merely defining the trait type and validation that traitlets should do once the trait is created on line 1925. This doesn't do the actual instantiation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining.


info_file = Unicode()

@default("info_file")
Expand Down Expand Up @@ -1906,6 +1920,10 @@ def init_logging(self):
logger.parent = self.log
logger.setLevel(self.log.level)

def init_eventbus(self):
"""Initialize the Event Bus."""
self.event_bus = EventBus.instance(parent=self)

def init_webapp(self):
"""initialize tornado webapp"""
self.tornado_settings["allow_origin"] = self.allow_origin
Expand Down Expand Up @@ -1970,6 +1988,7 @@ def init_webapp(self):
self.session_manager,
self.kernel_spec_manager,
self.config_manager,
self.event_bus,
self.extra_services,
self.log,
self.base_url,
Expand Down Expand Up @@ -2436,6 +2455,7 @@ def initialize(
if find_extensions:
self.find_server_extensions()
self.init_logging()
self.init_eventbus()
self.init_server_extensions()

# Special case the starter extension and load
Expand Down Expand Up @@ -2762,6 +2782,8 @@ async def _cleanup(self):
await self.cleanup_kernels()
if getattr(self, "session_manager", None):
self.session_manager.close()
if getattr(self, "event_bus", None):
self.event_bus.clear_instance()

def start_ioloop(self):
"""Start the IO Loop."""
Expand Down
Empty file.
15 changes: 15 additions & 0 deletions jupyter_server/services/events/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""An EventBus for use in the Jupyter server.

.. versionadded:: 2.0
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from jupyter_telemetry.eventlog import EventLog
from traitlets.config import SingletonConfigurable


class EventBus(EventLog, SingletonConfigurable):
"""A singleton eventlog that behaves as an event
bus for emitting Jupyter Server (and extension)
event data.
"""
80 changes: 80 additions & 0 deletions jupyter_server/services/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""A Websocket Handler for emitting Jupyter server events.

.. versionadded:: 2.0
"""
import logging

from jupyter_telemetry.eventlog import _skip_message
from pythonjsonlogger import jsonlogger
from tornado import web, websocket

from jupyter_server.base.handlers import JupyterHandler

AUTH_RESOURCE = "events"


class WebSocketLoggingHandler(logging.Handler):
"""Python logging handler that routes records to a Tornado websocket."""

def __init__(self, websocket, *args, **kwargs):
super().__init__(*args, **kwargs)
self.websocket = websocket

def emit(self, record):
"""Emit the message across the websocket"""
self.websocket.write_message(record.msg)


class SubscribeWebsocket(
JupyterHandler,
websocket.WebSocketHandler,
):
"""Websocket handler for subscribing to events"""

auth_resource = AUTH_RESOURCE

def pre_get(self):
"""Handles authentication/authorization when
attempting to subscribe to events emitted by
Jupyter Server's eventbus.
"""
# authenticate the request before opening the websocket
user = self.current_user
if user is None:
self.log.warning("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)

# authorize the user.
if not self.authorizer.is_authorized(self, user, "execute", "events"):
raise web.HTTPError(403)

async def get(self, *args, **kwargs):
self.pre_get()
res = super().get(*args, **kwargs)
await res

@property
def event_bus(self):
"""Jupyter Server's event bus that emits structured event data."""
return self.settings["event_bus"]

def open(self):
"""Routes events that are emitted by Jupyter Server's
EventBus to a WebSocket client in the browser.
"""
self.logging_handler = WebSocketLoggingHandler(self)
# Add a JSON formatter to the handler.
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
self.logging_handler.setFormatter(formatter)
Zsailer marked this conversation as resolved.
Show resolved Hide resolved
# To do: add an eventlog.add_handler method to jupyter_telemetry.
self.event_bus.log.addHandler(self.logging_handler)
self.event_bus.handlers.append(self.logging_handler)
Zsailer marked this conversation as resolved.
Show resolved Hide resolved

def on_close(self):
self.event_bus.log.removeHandler(self.logging_handler)
Zsailer marked this conversation as resolved.
Show resolved Hide resolved
self.event_bus.handlers.remove(self.logging_handler)


default_handlers = [
(r"/api/events/subscribe", SubscribeWebsocket),
]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"tornado>=6.1.0",
"traitlets>=5.1",
"websocket-client",
"jupyter_telemetry"
]

[project.readme]
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions tests/services/events/mock_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
$id: event.mock.jupyter.com/message
version: 1
title: Message
description: |
Emit a message
type: object
properties:
event_message:
title: Event Messages
description: |
Mock event message to read.
required:
- event_message
10 changes: 10 additions & 0 deletions tests/services/events/mockextension/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .mock_extension import _load_jupyter_server_extension # noqa: F401

# Function that makes these extensions discoverable
# by the test functions.


def _jupyter_server_extension_points():
return [
{"module": "tests.services.events.mockextension"},
]
23 changes: 23 additions & 0 deletions tests/services/events/mockextension/mock_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pathlib

from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.utils import url_path_join


class MockEventHandler(JupyterHandler):
def get(self):
# Emit an event.
self.event_bus.record_event(
schema_name="event.mockextension.jupyter.com/message",
version=1,
event={"event_message": "Hello world, from mock extension!"},
)


def _load_jupyter_server_extension(serverapp):
# Register a schema with the EventBus
schema_file = pathlib.Path(__file__).parent / "mock_extension_event.yaml"
serverapp.event_bus.register_schema_file(schema_file)
serverapp.web_app.add_handlers(
".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)]
)
13 changes: 13 additions & 0 deletions tests/services/events/mockextension/mock_extension_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
$id: event.mockextension.jupyter.com/message
version: 1
title: Message
description: |
Emit a message
type: object
properties:
event_message:
title: Event Message
description: |
Mock event message to read.
required:
- event_message
32 changes: 32 additions & 0 deletions tests/services/events/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json
import pathlib

import pytest


@pytest.fixture
def event_bus(jp_serverapp):
event_bus = jp_serverapp.event_bus
# Register the event schema defined in this directory.
schema_file = pathlib.Path(__file__).parent / "mock_event.yaml"
event_bus.register_schema_file(schema_file)
#
event_bus.allowed_schemas = ["event.mock.jupyter.com/message"]
return event_bus


async def test_subscribe_websocket(jp_ws_fetch, event_bus):
# Open a websocket connection.
ws = await jp_ws_fetch("/api/events/subscribe")

event_bus.record_event(
schema_name="event.mock.jupyter.com/message",
version=1,
event={"event_message": "Hello, world!"},
)
message = await ws.read_message()
event_data = json.loads(message)
# Close websocket
ws.close()

assert event_data.get("event_message") == "Hello, world!"
32 changes: 32 additions & 0 deletions tests/services/events/test_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json

import pytest


@pytest.fixture
def jp_server_config():
config = {
"ServerApp": {
"jpserver_extensions": {"tests.services.events.mockextension": True},
},
"EventBus": {"allowed_schemas": ["event.mockextension.jupyter.com/message"]},
}
return config


async def test_subscribe_websocket(jp_ws_fetch, jp_fetch):
# Open an event listener websocket
ws = await jp_ws_fetch("/api/events/subscribe")

# Hit the extension endpoint that emits an event
await jp_fetch("/mock/event")

# Check the event listener for a message
message = await ws.read_message()
event_data = json.loads(message)

# Close websocket
ws.close()

# Verify that an event message was received.
assert event_data.get("event_message") == "Hello world, from mock extension!"