diff --git a/debugging/mp_debug.py b/debugging/mp_debug.py index c43a81064..dfbfa9177 100644 --- a/debugging/mp_debug.py +++ b/debugging/mp_debug.py @@ -17,12 +17,14 @@ async def main(): """ async with tractor.open_nursery() as n: - # portal = await n.run_in_actor('future_self', bubble) - portal = await n.run_in_actor('future_self', bail) + portal1 = await n.run_in_actor('bubble', bubble) + portal = await n.run_in_actor('bail', bail) + # await portal.result() + # await portal1.result() # The ``async with`` will unblock here since the 'some_linguist' # actor has completed its main task ``cellar_door``. if __name__ == '__main__': - tractor.run(main, loglevel='info', debug_mode=True) + tractor.run(main, loglevel='critical', debug_mode=True) diff --git a/tractor/_actor.py b/tractor/_actor.py index 88c814168..157b186c4 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -126,13 +126,14 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: - log.exception("Actor errored:") - # NOTE: don't enter debug mode recursively after quitting pdb if _state.debug_mode() and not isinstance(err, bdb.BdbQuit): # Allow for pdb control in parent from ._debug import post_mortem + log.exception("Actor crashed, entering debug mode:") await post_mortem() + else: + log.exception("Actor crashed:") # always ship errors back to caller err_msg = pack_error(err) @@ -181,6 +182,7 @@ class Actor: # Information about `__main__` from parent _parent_main_data: Dict[str, str] + _parent_chan_cs: Optional[trio.CancelScope] = None def __init__( self, @@ -639,8 +641,13 @@ async def _async_main( # processing parent requests until our server is # 100% up and running. if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) + self._parent_chan_cs = await nursery.start( + partial( + self._process_messages, + self._parent_chan, + shield=True, + ) + ) # Register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") @@ -656,6 +663,8 @@ async def _async_main( # Blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) + self._parent_chan_cs.cancel() + except Exception as err: if not registered_with_arbiter: # TODO: I guess we could try to connect back @@ -711,6 +720,10 @@ async def _async_main( # or completed self.cancel_server() + # teardown msg loop with parent + if self._parent_chan_cs: + self._parent_chan_cs.cancel() + async def _serve_forever( self, *, @@ -773,6 +786,8 @@ async def cancel(self) -> None: for n in root.child_nurseries: n.cancel_scope.cancel() + self._parent_chan_cs.cancel() + async def _cancel_task(self, cid, chan): """Cancel a local task by call-id / channel. diff --git a/tractor/_debug.py b/tractor/_debug.py index e462b7647..5d3626be4 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -1,9 +1,7 @@ """ Multi-core debugging for da peeps! """ -import pdb import sys -import tty from functools import partial from typing import Awaitable, Tuple @@ -13,6 +11,16 @@ from .log import get_logger +try: + # wtf: only exported when installed in dev mode? + import pdbpp +except ImportError: + # pdbpp is installed in regular mode... + import pdb + assert pdb.xpm, "pdbpp is not installed?" + pdbpp = pdb + + log = get_logger(__name__) @@ -25,56 +33,96 @@ ) -def subactoruid2proc( - actor: 'Actor', # noqa - uid: Tuple[str, str] -) -> trio.Process: - n = actor._actoruid2nursery[uid] - _, proc, _ = n._children[uid] - return proc +_pdb_release_hook = None + + +class PdbwTeardown(pdbpp.Pdb): + """Add teardown hooks to the regular ``pdbpp.Pdb``. + """ + # TODO: figure out how to dissallow recursive .set_trace() entry + # since that'll cause deadlock for us. + def set_continue(self): + super().set_continue() + self.config.teardown(self) + + def set_quit(self): + super().set_quit() + self.config.teardown(self) + + +class TractorConfig(pdbpp.DefaultConfig): + """Custom ``pdbpp`` goodness. + """ + sticky_by_default = True + + def teardown(self, _pdb): + _pdb_release_hook(_pdb) + + +# override the pdbpp config with our coolio one +pdbpp.Pdb.DefaultConfig = TractorConfig + + +# TODO: will be needed whenever we get to true remote debugging. +# XXX see https://github.com/goodboy/tractor/issues/130 + +# def subactoruid2proc( +# actor: 'Actor', # noqa +# uid: Tuple[str, str] +# ) -> trio.Process: +# n = actor._actoruid2nursery[uid] +# _, proc, _ = n._children[uid] +# return proc + +# async def hijack_stdin(): +# log.info(f"Hijacking stdin from {actor.uid}") + +# trap std in and relay to subproc +# async_stdin = trio.wrap_file(sys.stdin) + +# async with aclosing(async_stdin): +# async for msg in async_stdin: +# log.trace(f"Stdin input:\n{msg}") +# # encode to bytes +# bmsg = str.encode(msg) + +# # relay bytes to subproc over pipe +# # await proc.stdin.send_all(bmsg) + +# if bmsg in _pdb_exit_patterns: +# log.info("Closing stdin hijack") +# break async def _hijack_stdin_relay_to_child( subactor_uid: Tuple[str, str] ) -> None: actor = tractor.current_actor() - proc = subactoruid2proc(actor, subactor_uid) - - # nlb = [] - - async def hijack_stdin(): - log.info(f"Hijacking stdin from {actor.uid}") - # try: - # # disable cooked mode - # fd = sys.stdin.fileno() - # old = tty.tcgetattr(fd) - # tty.setcbreak(fd) - - # trap std in and relay to subproc - async_stdin = trio.wrap_file(sys.stdin) + debug_lock = actor.statespace.setdefault( + '_debug_lock', trio.StrictFIFOLock() + ) - async with aclosing(async_stdin): - # while True: - async for msg in async_stdin: - log.trace(f"Stdin input:\n{msg}") - # nlb.append(msg) - # encode to bytes - bmsg = str.encode(msg) + log.debug(f"Actor {subactor_uid} is waiting on stdin hijack lock") + await debug_lock.acquire() + log.warning(f"Actor {subactor_uid} acquired stdin hijack lock") - # relay bytes to subproc over pipe - await proc.stdin.send_all(bmsg) + # TODO: when we get to true remote debugging + # this will deliver stdin data + try: + # indicate to child that we've locked stdio + yield 'Locked' - # line = str.encode(''.join(nlb)) - # print(line) + # wait for cancellation of stream by child + await trio.sleep_forever() - if bmsg in _pdb_exit_patterns: - log.info("Closing stdin hijack") - break - # finally: - # tty.tcsetattr(fd, tty.TCSAFLUSH, old) + # TODO: for remote debugging schedule hijacking in root scope + # (see above) + # actor._root_nursery.start_soon(hijack_stdin) - # schedule hijacking in root scope - actor._root_nursery.start_soon(hijack_stdin) + finally: + if debug_lock.locked(): + debug_lock.release() + log.debug(f"Actor {subactor_uid} released stdin hijack lock") # XXX: We only make this sync in case someone wants to @@ -84,35 +132,61 @@ def _breakpoint(debug_func) -> Awaitable[None]: in subactors. """ actor = tractor.current_actor() + do_unlock = trio.Event() - async def wait_for_parent_stdin_hijack(): - log.debug('Breakpoint engaged!') + async def wait_for_parent_stdin_hijack( + task_status=trio.TASK_STATUS_IGNORED + ): # TODO: need a more robust check for the "root" actor if actor._parent_chan: async with tractor._portal.open_portal( actor._parent_chan, start_msg_loop=False, + shield=True, ) as portal: # with trio.fail_after(1): - await portal.run( + agen = await portal.run( 'tractor._debug', '_hijack_stdin_relay_to_child', subactor_uid=actor.uid, ) + async with aclosing(agen): + async for val in agen: + assert val == 'Locked' + task_status.started() + with trio.CancelScope(shield=True): + await do_unlock.wait() + + # trigger cancellation of remote stream + break + + log.debug(f"Child {actor} released parent stdio lock") + + def unlock(_pdb): + do_unlock.set() + + global _pdb_release_hook + _pdb_release_hook = unlock + + async def _bp(): + # this must be awaited by caller + await actor._root_nursery.start( + wait_for_parent_stdin_hijack + ) # block here one frame up where ``breakpoint()`` # was awaited and begin handling stdin debug_func(actor) - # this must be awaited by caller - return wait_for_parent_stdin_hijack() + # return wait_for_parent_stdin_hijack() + return _bp() def _set_trace(actor): - pdb.set_trace( - header=f"\nAttaching pdb to actor: {actor.uid}\n", - # start 2 levels up + log.critical(f"\nAttaching pdb to actor: {actor.uid}\n") + PdbwTeardown().set_trace( + # start 2 levels up in user code frame=sys._getframe().f_back.f_back, ) @@ -125,7 +199,7 @@ def _set_trace(actor): def _post_mortem(actor): log.error(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") - pdb.post_mortem() + pdbpp.xpm(Pdb=PdbwTeardown) post_mortem = partial( diff --git a/tractor/_portal.py b/tractor/_portal.py index c84a7fe75..da8c916cb 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -22,7 +22,8 @@ @asynccontextmanager async def maybe_open_nursery( - nursery: trio.Nursery = None + nursery: trio.Nursery = None, + shield: bool = False, ) -> typing.AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. @@ -32,6 +33,7 @@ async def maybe_open_nursery( yield nursery else: async with trio.open_nursery() as nursery: + nursery.cancel_scope.shield = shield yield nursery @@ -316,6 +318,7 @@ async def open_portal( channel: Channel, nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, + shield: bool = False, ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -325,7 +328,7 @@ async def open_portal( assert actor was_connected = False - async with maybe_open_nursery(nursery) as nursery: + async with maybe_open_nursery(nursery, shield=shield) as nursery: if not channel.connected(): await channel.connect() was_connected = True diff --git a/tractor/_spawn.py b/tractor/_spawn.py index b9f95f638..1d9d00d3e 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -142,7 +142,8 @@ async def cancel_on_completion( else: log.info( f"Cancelling {portal.channel.uid} gracefully " - "after result {result}") + f"after result {result}" + ) # cancel the process now that we have a final result await portal.cancel_actor()