Skip to content

Commit

Permalink
Handle kernels with no heartbeat (jupyter-server#376)
Browse files Browse the repository at this point in the history
* add plugins for reconnecting to disconnected kernels

* allow user to restart the kernel heartbeat from the kernel menu

* remove dev print statement

* revert is_alive trigger used for testing reconnect dialog

* rename disconnectedkernel to noheartbeat

* update dependencY

* eventlistener singleton must be cleared in tests

* Update data_studio_jupyter_extensions/configurables/kernel_restarter.py

Co-authored-by: Andrey Velichkevich <[email protected]>

* remove old debugging prints

Co-authored-by: Andrey Velichkevich <[email protected]>
  • Loading branch information
2 people authored and GitHub Enterprise committed May 18, 2022
1 parent 3560d6a commit 0b86ad9
Show file tree
Hide file tree
Showing 18 changed files with 319 additions and 66 deletions.
7 changes: 7 additions & 0 deletions data_studio_jupyter_extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def _jupyter_server_extension_points(): # pragma: no cover
from data_studio_jupyter_extensions.extensions.external_links.extension import (
ExternalLinksExtension,
)
from data_studio_jupyter_extensions.extensions.kernel_actions.extension import (
KernelActionsExtension,
)

return [
{
Expand All @@ -49,4 +52,8 @@ def _jupyter_server_extension_points(): # pragma: no cover
"module": "data_studio_jupyter_extensions.extensions.external_links.extension",
"app": ExternalLinksExtension,
},
{
"module": "data_studio_jupyter_extensions.extensions.external_links.extension",
"app": KernelActionsExtension,
},
]
38 changes: 25 additions & 13 deletions data_studio_jupyter_extensions/configurables/kernel_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,31 @@ def _dead_state(self):
def _disconnected_state(self):
self.log.warning(f"No heartbeat detected for: {self.kernel_manager.kernel_id}")
self._emit(
state=constants.KERNEL_STATE.DISCONNECTED,
msg="Kernel appears to be running, but a connection could not be established.",
state=constants.KERNEL_STATE.NO_HEARTBEAT,
msg="Kernel was found, but no heartbeat was detected.",
)
# Trigger a "kernel-disconnected" event.
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/kernel-no-heartbeat",
version=1,
event={
"notebook_id": self.notebook_id or "Not set",
"process_id": self.process_id or "Not set yet",
"kernel_id": self.kernel_id,
},
)
self._attempt_count = 0
self.stop()

def start(self):
"""Start the polling of the kernel."""
self._emit(
state=constants.KERNEL_STATE.CONNECTING,
msg="Waiting for a kernel heartbeat.",
)
# Reset all counters for the poller.
self._start_time = time.time()
self._attempt_count = 1
super().start()

async def poll(self):
Expand All @@ -95,7 +112,7 @@ async def poll(self):

# If the kernel is communicating, we're good here.
if km.is_communicating():
if not self._connected_once or self._attempt_count > 0:
if not self._connected_once or self._attempt_count == 1:
self._emit(
state=constants.KERNEL_STATE.CONNECTED,
msg="Kernel heartbeat established.",
Expand All @@ -108,23 +125,18 @@ async def poll(self):

# Check if the kernel ever successfully connected.
if not self._connected_once:
self._emit(
state=constants.KERNEL_STATE.CONNECTING,
msg="Waiting for a kernel heartbeat.",
)
# Kernel is disconnected due to timeout.
if km.heartbeat_timeout < (now - self._start_time):
self._disconnected_state()
await self.kernel_manager.shutdown_kernel()
# Kernel connected before; but a heartbeat was missed
else:
self._attempt_count += 1
if self._attempt_count == 1:
if self._attempt_count < self.restart_limit:
self._emit(
state=constants.KERNEL_STATE.DISCONNECTED,
msg="Missed a kernel heartbeat. Trying to reconnect.",
state=constants.KERNEL_STATE.MISSED_HEARTBEAT,
msg="Listening for another heartbeat.",
)
elif self._attempt_count == self.restart_limit:
self._attempt_count += 1
elif self._attempt_count >= self.restart_limit:
self._last_attempt = now
# If we don't get a working heartbeat, check to see if
# the kernel pod is even running.
Expand Down
36 changes: 22 additions & 14 deletions data_studio_jupyter_extensions/configurables/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,26 @@ async def send_signal(self, signum: int) -> None:
"""
pass

async def _stop_kernel(self):
"""Custom method (not part of the Provisioners ABC) for stopping kernels
on notebook service.
"""
self._emit(
constants.KERNEL_STATE.TERMINATING, msg="Shutting down the current kernel."
)
# If the stop_kernel
try:
await self.nbservice_client.stop_kernel(self.process_id)
except HTTPClientError:
self.log.warn(
f"Kernel {self.kernel_id} wasn't found in Notebook Service. Proceeding with the rest of the kernel shutdown sequence."
)
pass
self._emit(
constants.KERNEL_STATE.DEAD, msg="Kernel has been successfully terminated."
)
self.process_id = None

async def kill(self, restart: bool = False) -> None:
"""
Kill the kernel process.
Expand All @@ -362,14 +382,7 @@ async def kill(self, restart: bool = False) -> None:
restart is True if this operation will precede a subsequent launch_kernel request.
"""
self._emit(
constants.KERNEL_STATE.TERMINATING, msg="Shutting down the current kernel."
)
await self.nbservice_client.stop_kernel(self.process_id)
self._emit(
constants.KERNEL_STATE.DEAD, msg="Kernel has been successfully terminated."
)
self.process_id = None
await self._stop_kernel()

async def terminate(self, restart: bool = False) -> None:
"""
Expand All @@ -382,12 +395,7 @@ async def terminate(self, restart: bool = False) -> None:
restart is True if this operation precedes a start launch_kernel request.
"""
self._emit(
constants.KERNEL_STATE.TERMINATING, msg="Terminating the current kernel."
)
await self.nbservice_client.stop_kernel(self.process_id)
self._emit(constants.KERNEL_STATE.DEAD, msg="Kernel is terminated.")
self.process_id = None
await self._stop_kernel()

async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
return await super().pre_launch(cmd=[], **kwargs)
Expand Down
4 changes: 4 additions & 0 deletions data_studio_jupyter_extensions/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
"CONNECTING",
"CONNECTED",
"DISCONNECTED",
"MISSED_HEARTBEAT",
"NO_HEARTBEAT",
"RECONNECTING",
],
)
Expand All @@ -55,6 +57,8 @@
"connecting",
"connected",
"disconnected",
"missed heartbeat",
"no heartbeat",
"reconnecting",
)
# states are listed here: https://jupyter-client.readthedocs.io/en/stable/messaging.html?highlight=execution_state#kernel-status
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
$id: event.datastudio.jupyter.com/kernel-no-heartbeat
version: 1
title: Kernel Heartbeat Failure
description: |
The kernel has no heartbeat.
type: object
properties:
notebook_id:
title: Notebook Server ID
description: |
UUID for this notebook server process.
process_id:
title: Kernel Process ID
description: |
UUID for this kernel process.
kernel_id:
title: Kernel ID
description: |
UUID for this kernel
required:
- notebook_id
- process_id
- kernel_id
4 changes: 2 additions & 2 deletions data_studio_jupyter_extensions/extensions/events/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def emit(self, record):
kernel_id = record.msg["kernel_id"]
sm = self.serverapp.session_manager
# Record event for any pending sessions
for record in sm._pending_sessions._records:
if record.kernel_id == kernel_id:
for session in sm._pending_sessions._records:
if session.kernel_id == kernel_id:
data = record.msg.copy()
kernel_path = self._kernel_path(record.path, record.name)
data.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"event.datastudio.jupyter.com/kernel-message",
"event.datastudio.jupyter.com/kernel-blocked",
"event.datastudio.jupyter.com/kernel-failed",
"event.datastudio.jupyter.com/kernel-no-heartbeat",
"event.datastudio.jupyter.com/syncing-state",
]

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from jupyter_server.extension.application import ExtensionApp

from .handlers import handlers


class KernelActionsExtension(ExtensionApp):
"""Jupyter Server extension that verifies
the health of the server.
"""

name = "kernel_actions"
handlers = handlers
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Replaces the open source version of the Kernel Actions Endpoint
with a new extension that handles some custom actions.
"""
from jupyter_server.auth.decorator import authorized
from jupyter_server.services.kernels.handlers import _kernel_id_regex
from jupyter_server.services.kernels.handlers import KernelActionHandler
from tornado import web


class DataStudioKernelActionHandler(KernelActionHandler):
@web.authenticated
@authorized
async def post(self, kernel_id, action):
kernel = self.kernel_manager.get_kernel(kernel_id)
if action == "monitor":
kernel._restarter.start()
await super().post(kernel_id, action)


_kernel_action_regex = r"(?P<action>restart|interrupt|monitor)"

handlers = [
(
rf"/api/kernels/{_kernel_id_regex}/{_kernel_action_regex}",
DataStudioKernelActionHandler,
),
]
70 changes: 45 additions & 25 deletions src/eventlistener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,57 +9,65 @@ import { PageConfig, URLExt } from '@jupyterlab/coreutils';
* functions when the event occurs.
*/
export class EventListener {
private static instance: EventListener;
private static instance: EventListener | null;
callbacks: Record<string, Function> = {};
url: string;
ws: WebSocket | null;

private constructor() {
this.url =
PageConfig.getOption('studioSubscribeURL') ||
URLExt.join(PageConfig.getWsUrl(), 'subscribe');

this.ws = null;
this.freshWebsocket();
}
/**
* Get an instance of the singleton. If an instance doesn't
* already exists, create new one.
* @returns EventListener
*/
static getInstance(): EventListener {
if (!EventListener.instance) {
EventListener.instance = new EventListener();
}
return EventListener.instance;
}
/**
* Connect a (long-lived) websocket to the
* `/subscribe` endpoint.
*/
public connect() {

public freshWebsocket() {
let listener = this;
if (document.hidden) {
setTimeout(() => {
this.connect();
listener.freshWebsocket();
}, 1000);
return;
}
const ws = new WebSocket(this.url);
let listener = this;
ws.onclose = function (event: CloseEvent) {
this.ws = new WebSocket(this.url);
this.ws.onclose = function (event: CloseEvent) {
// If the websocket is closed on purpose,
// don't create a new one.
if (event.code === 1000) {
return;
}
/* istanbul ignore next */
setTimeout(() => {
listener.connect();
listener.freshWebsocket();
}, 1000);
};
// Attach listeners
for (let name in this.callbacks) {
let callback = this.callbacks[name];
ws.addEventListener('message', event => {
callback(event);
});
this.connectCallback(callback);
}
}

/**
* Get an instance of the singleton. If an instance doesn't
* already exists, create new one.
* @returns EventListener
*/
static getInstance(): EventListener {
if (!EventListener.instance) {
EventListener.instance = new EventListener();
}
return EventListener.instance;
}

static clearInstance() {
if (EventListener.instance) {
EventListener.instance = null;
}
}

/**
* Attach a callback to an event that will be triggered when
* that event comes across the `/subscribe` endpoint.
Expand All @@ -75,5 +83,17 @@ export class EventListener {
}
};
this.callbacks[eventId] = listener;
// Add as event listener to the websocket.
if (this.ws) {
this.connectCallback(listener);
}
}

private connectCallback(callback: any) {
if (this.ws) {
this.ws.addEventListener('message', event => {
callback(event);
});
}
}
}
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { DisableAutoStartingKernelsPlugin } from './autostartingkernels';
import { NotebookPixiedustShimPlugin } from './pixiedustshim';
import { KernelInfoPlugin } from './kernelinfo';
import { RunningTabPlugin } from './runningtab';
import { NoKernelHeartbeatPlugin } from './noheartbeat';

const plugins: JupyterFrontEndPlugin<any>[] = [
kernelStatusPlugin,
Expand All @@ -31,7 +32,8 @@ const plugins: JupyterFrontEndPlugin<any>[] = [
DisableAutoStartingKernelsPlugin,
NotebookPixiedustShimPlugin,
KernelInfoPlugin,
RunningTabPlugin
RunningTabPlugin,
NoKernelHeartbeatPlugin
];

export default plugins;
2 changes: 0 additions & 2 deletions src/kernelblocked.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ export const KernelBlockedPlugin: JupyterFrontEndPlugin<void> = {
showErrorMessage('409: Kernel Action Blocked', data.message);
}
);

listener.connect();
}
};
2 changes: 0 additions & 2 deletions src/kernelfailed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ export const KernelFailedPlugin: JupyterFrontEndPlugin<void> = {
showErrorMessage('Kernel launch failed', data.message);
}
);

listener.connect();
}
};
Loading

0 comments on commit 0b86ad9

Please sign in to comment.