From 953eb4b1457ba2384ba6b8daf480292a5acacca8 Mon Sep 17 00:00:00 2001 From: RK Date: Mon, 1 Feb 2021 09:44:10 +0000 Subject: [PATCH] Issue #12786: Create hook for dispatching messages out of order --- ipykernel/inprocess/client.py | 4 +--- ipykernel/kernelbase.py | 37 ++++++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index 1784a6eaa..f424eedee 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg): raise RuntimeError('Cannot send request. No kernel exists.') stream = kernel.shell_stream - self.session.send(stream, msg) - msg_parts = stream.recv_multipart() loop = asyncio.get_event_loop() - loop.run_until_complete(kernel.dispatch_shell(msg_parts)) + loop.run_until_complete(kernel.dispatch_shell(msg)) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e391411e4..36f15f5a4 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -306,12 +306,8 @@ async def dispatch_shell(self, msg): # flush control queue before handling shell requests await self._flush_control_queue() - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Message", exc_info=True) - return + if idents is None: + idents = [] # Set the parent message for side effects. self.set_parent(idents, msg, channel='shell') @@ -465,15 +461,38 @@ async def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, *args): + def should_dispatch_immediately(self, msg): + """ + This provides a hook for dispatching incoming messages + from the frontend immediately, and out of order. + + It could be used to allow asynchronous messages from + GUIs to be processed. + """ + return False + + def schedule_dispatch(self, msg, dispatch): """schedule a message for dispatch""" + + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid shell message", exc_info=True) + return + + new_args = (msg, idents) + + if self.should_dispatch_immediately(msg): + return self.io_loop.add_callback(dispatch, *new_args) + idx = next(self._message_counter) self.msg_queue.put_nowait( ( idx, dispatch, - args, + new_args, ) ) # ensure the eventloop wakes up @@ -497,7 +516,7 @@ def start(self): self.shell_stream.on_recv( partial( self.schedule_dispatch, - self.dispatch_shell, + dispatch=self.dispatch_shell, ), copy=False, )