Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nudge on the control channel instead of the shell #628

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,16 @@ def create_stream(self):
def nudge(self):
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
a shell reply and at least one iopub message,
a control reply and at least one iopub message,
ensuring that zmq subscriptions are established,
sockets are fully connected, and kernel is responsive.
Keeps retrying kernel_info_request until these are both received.
"""
kernel = self.kernel_manager.get_kernel(self.kernel_id)

# Do not nudge busy kernels as kernel info requests sent to shell are
# queued behind execution requests.
# nudging in this case would cause a potentially very long wait
# before connections are opened,
# plus it is *very* unlikely that a busy kernel will not finish
# establishing its zmq subscriptions before processing the next request.
if getattr(kernel, "execution_state") == "busy":
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
f = Future()
f.set_result(None)
return f

# Use a transient shell channel to prevent leaking
# shell responses to the front-end.
shell_channel = kernel.connect_shell()
# Use a transient control channel to prevent leaking
# control responses to the front-end.
control_channel = kernel.connect_control()
# The IOPub used by the client, whose subscriptions we are verifying.
iopub_channel = self.channels["iopub"]

Expand All @@ -172,13 +160,13 @@ def cleanup(_=None):
"""Common cleanup"""
loop.remove_timeout(nudge_handle)
iopub_channel.stop_on_recv()
if not shell_channel.closed():
shell_channel.close()
if not control_channel.closed():
control_channel.close()

# trigger cleanup when both message futures are resolved
both_done.add_done_callback(cleanup)

def on_shell_reply(msg):
def on_control_reply(msg):
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
if not info_future.done():
self.log.debug("Nudge: resolving shell future: %s", self.kernel_id)
Expand All @@ -192,7 +180,7 @@ def on_iopub(msg):
iopub_future.set_result(None)

iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
control_channel.on_recv(on_control_reply)
loop = IOLoop.current()

# Nudge the kernel with kernel info requests until we get an IOPub message
Expand All @@ -213,15 +201,15 @@ def nudge(count):
return

# check for closed zmq socket
if shell_channel.closed():
if control_channel.closed():
self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id)
finish()
return

if not both_done.done():
log = self.log.warning if count % 10 == 0 else self.log.debug
log("Nudge: attempt %s on kernel %s" % (count, self.kernel_id))
self.session.send(shell_channel, "kernel_info_request")
self.session.send(control_channel, "kernel_info_request")
nonlocal nudge_handle
nudge_handle = loop.call_later(0.5, nudge, count)

Expand Down