Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel RPC tasks and actor machinery greedily #240

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 65 additions & 41 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ def __init__(
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.loglevel = loglevel
self._arb_addr = arbiter_addr

self._arb_addr = (
str(arbiter_addr[0]),
int(arbiter_addr[1])) if arbiter_addr else None

# marked by the process spawning backend at startup
# will be None for the parent most process started manually
Expand Down Expand Up @@ -598,7 +601,7 @@ async def _process_messages(
shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Process messages for the channel async-RPC style.
"""Process messages for a single channel async-RPC style.

Receive multiplexed RPC requests and deliver responses over ``chan``.
"""
Expand Down Expand Up @@ -659,18 +662,47 @@ async def _process_messages(
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")

# TODO: we should be able to remove this ``self``
# hack by simply having either a special msg set or
# an internal machinery invocation system? We really
# don't need to strictly stick to implementing all
# mechanics as some form of task-oriented ipc.
if ns == 'self':
func = getattr(self, funcname)

# TODO: should we instead have some special
# messages for these?
# remote cancel request cases:
if funcname == '_cancel_task':
# the 'cancel a specific task' case

# XXX: a special case is made here for
# remote calls since we don't want the
# remote actor have to know which channel
# the task is associated with and we can't
# pass non-primitive types between actors.
# This means you can use:
# Portal.run('self', '_cancel_task, cid=did)
# without passing the `chan` arg.
kwargs['chan'] = chan
# remote calls letting you use:
# ``Portal.run('self', '_cancel_task, cid=did)``
await self._cancel_task(kwargs['cid'], chan)
continue

elif funcname == 'cancel':
# the 'cancel the whole actor' case which
# shuts down all ``trio`` machinery making
# up this "actor" runtime.
# Triggered by a call to:
# ``Portal.run('self', 'cancel, cid=did)``
await self.cancel()

# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.warning(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break

else:
# 'self' means lookup a local actor method
func = getattr(self, funcname)

else:
# complain to client about restricted modules
try:
Expand All @@ -683,35 +715,24 @@ async def _process_messages(

# spin up a task for the requested function
log.debug(f"Spawning task for {func}")
assert self._service_n
cs = await self._service_n.start(
service_n = self._service_n
assert service_n
cs = await service_n.start(
partial(_invoke, self, cid, chan, func, kwargs),
name=funcname,
)
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
if func != self.cancel:
if isinstance(cs, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
else:
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
if isinstance(cs, Exception):
log.warning(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())

log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
Expand Down Expand Up @@ -1075,13 +1096,14 @@ async def cancel(self) -> bool:
# for n in root.child_nurseries:
# n.cancel_scope.cancel()

async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel.
async def _cancel_task(self, cid, chan) -> bool:
'''Cancel a local task by call-id / channel.

Note this method will be treated as a streaming function
by remote actor-callers due to the declaration of ``ctx``
in the signature (for now).
"""

'''
# right now this is only implicitly called by
# streaming IPC but it should be called
# to cancel any remotely spawned task
Expand All @@ -1091,7 +1113,7 @@ async def _cancel_task(self, cid, chan):
scope, func, is_complete = self._rpc_tasks[(chan, cid)]
except KeyError:
log.warning(f"{cid} has already completed/terminated?")
return
return False

log.debug(
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
Expand All @@ -1100,7 +1122,7 @@ async def _cancel_task(self, cid, chan):
# don't allow cancelling this function mid-execution
# (is this necessary?)
if func is self._cancel_task:
return
raise RuntimeError('Attempting to cancel a cancel!?!?')

scope.cancel()

Expand All @@ -1114,6 +1136,8 @@ async def _cancel_task(self, cid, chan):
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")

return True

async def cancel_rpc_tasks(
self,
only_chan: Optional[Channel] = None,
Expand Down