diff --git a/tractor/_actor.py b/tractor/_actor.py index f84a59785..04a1f9f71 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -317,7 +317,10 @@ def __init__( # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = arbiter_addr + + self._arb_addr = ( + str(arbiter_addr[0]), + int(arbiter_addr[1])) if arbiter_addr else None # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -598,7 +601,7 @@ async def _process_messages( shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: - """Process messages for the channel async-RPC style. + """Process messages for a single channel async-RPC style. Receive multiplexed RPC requests and deliver responses over ``chan``. """ @@ -659,18 +662,47 @@ async def _process_messages( log.debug( f"Processing request from {actorid}\n" f"{ns}.{funcname}({kwargs})") + + # TODO: we should be able to remove this ``self`` + # hack by simply having either a special msg set or + # an internal machinery invocation system? We really + # don't need to strictly stick to implementing all + # mechanics as some form of task-oriented ipc. if ns == 'self': - func = getattr(self, funcname) + + # TODO: should we instead have some special + # messages for these? + # remote cancel request cases: if funcname == '_cancel_task': + # the 'cancel a specific task' case + # XXX: a special case is made here for - # remote calls since we don't want the - # remote actor have to know which channel - # the task is associated with and we can't - # pass non-primitive types between actors. - # This means you can use: - # Portal.run('self', '_cancel_task, cid=did) - # without passing the `chan` arg. - kwargs['chan'] = chan + # remote calls letting you use: + # ``Portal.run('self', '_cancel_task, cid=did)`` + await self._cancel_task(kwargs['cid'], chan) + continue + + elif funcname == 'cancel': + # the 'cancel the whole actor' case which + # shuts down all ``trio`` machinery making + # up this "actor" runtime. + # Triggered by a call to: + # ``Portal.run('self', 'cancel, cid=did)`` + await self.cancel() + + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.warning( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await self._cancel_complete.wait() + loop_cs.cancel() + break + + else: + # 'self' means lookup a local actor method + func = getattr(self, funcname) + else: # complain to client about restricted modules try: @@ -683,35 +715,24 @@ async def _process_messages( # spin up a task for the requested function log.debug(f"Spawning task for {func}") - assert self._service_n - cs = await self._service_n.start( + service_n = self._service_n + assert service_n + cs = await service_n.start( partial(_invoke, self, cid, chan, func, kwargs), name=funcname, ) - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - if func != self.cancel: - if isinstance(cs, Exception): - log.warning( - f"Task for RPC func {func} failed with" - f"{cs}") - else: - # mark that we have ongoing rpc tasks - self._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) - else: - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` + if isinstance(cs, Exception): log.warning( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await self._cancel_complete.wait() - loop_cs.cancel() - break + f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + self._ongoing_rpc_tasks = trio.Event() + log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") @@ -1075,13 +1096,14 @@ async def cancel(self) -> bool: # for n in root.child_nurseries: # n.cancel_scope.cancel() - async def _cancel_task(self, cid, chan): - """Cancel a local task by call-id / channel. + async def _cancel_task(self, cid, chan) -> bool: + '''Cancel a local task by call-id / channel. Note this method will be treated as a streaming function by remote actor-callers due to the declaration of ``ctx`` in the signature (for now). - """ + + ''' # right now this is only implicitly called by # streaming IPC but it should be called # to cancel any remotely spawned task @@ -1091,7 +1113,7 @@ async def _cancel_task(self, cid, chan): scope, func, is_complete = self._rpc_tasks[(chan, cid)] except KeyError: log.warning(f"{cid} has already completed/terminated?") - return + return False log.debug( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" @@ -1100,7 +1122,7 @@ async def _cancel_task(self, cid, chan): # don't allow cancelling this function mid-execution # (is this necessary?) if func is self._cancel_task: - return + raise RuntimeError('Attempting to cancel a cancel!?!?') scope.cancel() @@ -1114,6 +1136,8 @@ async def _cancel_task(self, cid, chan): f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") + return True + async def cancel_rpc_tasks( self, only_chan: Optional[Channel] = None,