Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #12786: Create hook for dispatching GUI messages asynchronously #589

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions ipykernel/inprocess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg):
raise RuntimeError('Cannot send request. No kernel exists.')

stream = kernel.shell_stream
self.session.send(stream, msg)
msg_parts = stream.recv_multipart()
loop = asyncio.get_event_loop()
loop.run_until_complete(kernel.dispatch_shell(msg_parts))
loop.run_until_complete(kernel.dispatch_shell(msg))
idents, reply_msg = self.session.recv(stream, copy=False)
self.shell_channel.call_handlers_later(reply_msg)

Expand Down
15 changes: 13 additions & 2 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def _user_ns_changed(self, change):
_sys_raw_input = Any()
_sys_eval_input = Any()

comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]

def __init__(self, **kwargs):
super(IPythonKernel, self).__init__(**kwargs)

Expand Down Expand Up @@ -102,8 +104,7 @@ def __init__(self, **kwargs):
self.comm_manager = CommManager(parent=self, kernel=self)

self.shell.configurables.append(self.comm_manager)
comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
for msg_type in comm_msg_types:
for msg_type in self.comm_msg_types:
self.shell_handlers[msg_type] = getattr(self.comm_manager, msg_type)

if _use_appnope() and self._darwin_app_nap:
Expand Down Expand Up @@ -583,6 +584,16 @@ def do_clear(self):
self.shell.reset(False)
return dict(status='ok')

def should_dispatch_immediately(self, msg):
try:
msg_type = msg['header']['msg_type']
if msg_type in self.comm_msg_types:
return True
except ValueError:
pass

return False


# This exists only for backwards compatibility - use IPythonKernel instead

Expand Down
46 changes: 33 additions & 13 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time
import uuid
import warnings
from contextvars import ContextVar

try:
# jupyter_client >= 5, use tz-aware now
Expand Down Expand Up @@ -134,7 +135,7 @@ def _default_ident(self):

# track associations with current request
_allow_stdin = Bool(False)
_parents = Dict({"shell": {}, "control": {}})
_parents = Dict({"shell": ContextVar("shell_parent", default={}), "control": ContextVar("control_parent", default={})})
_parent_ident = Dict({'shell': b'', 'control': b''})

@property
Expand Down Expand Up @@ -301,18 +302,14 @@ def should_handle(self, stream, msg, idents):
return False
return True

async def dispatch_shell(self, msg):
async def dispatch_shell(self, msg, idents=None):
"""dispatch shell requests"""

# flush control queue before handling shell requests
await self._flush_control_queue()

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except Exception:
self.log.error("Invalid Message", exc_info=True)
return
if idents is None:
idents = []

# Set the parent message for side effects.
self.set_parent(idents, msg, channel='shell')
Expand Down Expand Up @@ -466,15 +463,38 @@ async def dispatch_queue(self):
def _message_counter_default(self):
return itertools.count()

def schedule_dispatch(self, dispatch, *args):
def should_dispatch_immediately(self, msg):
"""
This provides a hook for dispatching incoming messages
from the frontend immediately, and out of order.

It could be used to allow asynchronous messages from
GUIs to be processed.
"""
return False

def schedule_dispatch(self, msg, dispatch):
"""schedule a message for dispatch"""

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid shell message", exc_info=True)
return

new_args = (msg, idents)

if self.should_dispatch_immediately(msg):
return self.io_loop.add_callback(dispatch, *new_args)

idx = next(self._message_counter)

self.msg_queue.put_nowait(
(
idx,
dispatch,
args,
new_args,
)
)
# ensure the eventloop wakes up
Expand All @@ -498,7 +518,7 @@ def start(self):
self.shell_stream.on_recv(
partial(
self.schedule_dispatch,
self.dispatch_shell,
dispatch=self.dispatch_shell,
),
copy=False,
)
Expand Down Expand Up @@ -556,7 +576,7 @@ def set_parent(self, ident, parent, channel='shell'):
on the stdin channel.
"""
self._parent_ident[channel] = ident
self._parents[channel] = parent
self._parents[channel].set(parent)

def get_parent(self, channel="shell"):
"""Get the parent request associated with a channel.
Expand All @@ -573,7 +593,7 @@ def get_parent(self, channel="shell"):
message : dict
the parent message for the most recent request on the channel.
"""
return self._parents.get(channel, {})
return self._parents.get(channel, {}).get({})

def send_response(self, stream, msg_or_type, content=None, ident=None,
buffers=None, track=False, header=None, metadata=None, channel='shell'):
Expand Down