Skip to content

Commit

Permalink
Nudge on the control channel instead of the shell (#628)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanMabille authored Dec 3, 2021
1 parent e5cb6c4 commit 7e11d8e
Showing 1 changed file with 10 additions and 22 deletions.
32 changes: 10 additions & 22 deletions jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,16 @@ def create_stream(self):
def nudge(self):
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
a shell reply and at least one iopub message,
a control reply and at least one iopub message,
ensuring that zmq subscriptions are established,
sockets are fully connected, and kernel is responsive.
Keeps retrying kernel_info_request until these are both received.
"""
kernel = self.kernel_manager.get_kernel(self.kernel_id)

# Do not nudge busy kernels as kernel info requests sent to shell are
# queued behind execution requests.
# nudging in this case would cause a potentially very long wait
# before connections are opened,
# plus it is *very* unlikely that a busy kernel will not finish
# establishing its zmq subscriptions before processing the next request.
if getattr(kernel, "execution_state") == "busy":
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
f = Future()
f.set_result(None)
return f

# Use a transient shell channel to prevent leaking
# shell responses to the front-end.
shell_channel = kernel.connect_shell()
# Use a transient control channel to prevent leaking
# control responses to the front-end.
control_channel = kernel.connect_control()
# The IOPub used by the client, whose subscriptions we are verifying.
iopub_channel = self.channels["iopub"]

Expand All @@ -172,13 +160,13 @@ def cleanup(_=None):
"""Common cleanup"""
loop.remove_timeout(nudge_handle)
iopub_channel.stop_on_recv()
if not shell_channel.closed():
shell_channel.close()
if not control_channel.closed():
control_channel.close()

# trigger cleanup when both message futures are resolved
both_done.add_done_callback(cleanup)

def on_shell_reply(msg):
def on_control_reply(msg):
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
if not info_future.done():
self.log.debug("Nudge: resolving shell future: %s", self.kernel_id)
Expand All @@ -192,7 +180,7 @@ def on_iopub(msg):
iopub_future.set_result(None)

iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
control_channel.on_recv(on_control_reply)
loop = IOLoop.current()

# Nudge the kernel with kernel info requests until we get an IOPub message
Expand All @@ -213,15 +201,15 @@ def nudge(count):
return

# check for closed zmq socket
if shell_channel.closed():
if control_channel.closed():
self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id)
finish()
return

if not both_done.done():
log = self.log.warning if count % 10 == 0 else self.log.debug
log("Nudge: attempt %s on kernel %s" % (count, self.kernel_id))
self.session.send(shell_channel, "kernel_info_request")
self.session.send(control_channel, "kernel_info_request")
nonlocal nudge_handle
nudge_handle = loop.call_later(0.5, nudge, count)

Expand Down

0 comments on commit 7e11d8e

Please sign in to comment.