Skip to content

Commit

Permalink
Support "infected asyncio" actors
Browse files Browse the repository at this point in the history
This is an initial solution for #120.

Allow spawning `asyncio` based actors which run `trio` in guest
mode. This enables spawning `tractor` actors on top of the `asyncio`
event loop whilst still leveraging the SC focused internal actor
supervision machinery. Add a `tractor.to_syncio.run()` api to allow
spawning tasks on the `asyncio` loop from an embedded (remote) `trio`
task and return or stream results all the way back through the `tractor`
IPC system using a very similar api to portals.

One outstanding problem is getting SC around calls to
`asyncio.create_task()`. Currently a task that crashes isn't able to
easily relay the error to the embedded `trio` task without us fully
enforcing the portals based message protocol (which seems superfluous
given the error ref is in process). Further experiments using `anyio`
task groups may alleviate this.
  • Loading branch information
goodboy committed Jun 28, 2020
1 parent bdcb5db commit 15a3411
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 20 deletions.
2 changes: 2 additions & 0 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ._exceptions import RemoteActorError, ModuleNotExposed
from . import msg
from . import _spawn
from . import to_asyncio


__all__ = [
Expand All @@ -35,6 +36,7 @@
'RemoteActorError',
'ModuleNotExposed',
'msg'
'to_asyncio'
]


Expand Down
66 changes: 60 additions & 6 deletions tractor/_entry.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,70 @@
"""
Process entry points.
"""
import asyncio
from functools import partial
from typing import Tuple, Any
from typing import Tuple, Any, Awaitable

import trio # type: ignore

from ._actor import Actor
from .log import get_console_log, get_logger
from . import _state


__all__ = ('run',)


log = get_logger(__name__)


def _asyncio_main(
trio_main: Awaitable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()

trio_done_fut = asyncio.Future()

def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)

# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)

(await trio_done_fut).unwrap()

asyncio.run(aio_main(trio_main))


def _mp_main(
actor,
actor: 'Actor',
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
Expand All @@ -38,16 +84,24 @@ def _mp_main(
_state._current_actor = actor

log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
accept_addr,
parent_addr=parent_addr
)
try:
trio.run(partial(
actor._async_main, accept_addr, parent_addr=parent_addr))
if infect_asyncio:
actor._infected_aio = True
_asyncio_main(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {actor.uid} terminated")


async def _trip_main(
actor,
actor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
) -> None:
Expand Down
13 changes: 10 additions & 3 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trip_main


log = get_logger('tractor')
Expand Down Expand Up @@ -161,6 +162,7 @@ async def new_proc(
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
use_trio_run_in_process: bool = False,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
Expand All @@ -173,9 +175,12 @@ async def new_proc(

async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process':
if infect_asyncio:
raise NotImplementedError("Asyncio is incompatible with trip")
# trio_run_in_process
async with trio_run_in_process.open_in_process(
subactor._trip_main,
_trip_main,
subactor,
bind_addr,
parent_addr,
) as proc:
Expand Down Expand Up @@ -235,12 +240,14 @@ async def new_proc(
fs_info = (None, None, None, None, None)

proc = _ctx.Process( # type: ignore
target=subactor._mp_main,
target=_mp_main,
args=(
subactor,
bind_addr,
fs_info,
start_method,
parent_addr
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,
Expand Down
29 changes: 18 additions & 11 deletions tractor/_trionics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
``trio`` inspired apis and helpers
"""
from functools import partial
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
import typing
Expand All @@ -10,7 +11,7 @@

from ._state import current_actor
from .log import get_logger, get_loglevel
from ._actor import Actor # , ActorFailure
from ._actor import Actor
from ._portal import Portal
from . import _spawn

Expand Down Expand Up @@ -51,6 +52,7 @@ async def start_actor(
rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
infect_asyncio: bool = False,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()

Expand All @@ -71,13 +73,16 @@ async def start_actor(

# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
_spawn.new_proc,
name,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
partial(
_spawn.new_proc,
name,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
infect_asyncio=infect_asyncio,
)
)

async def run_in_actor(
Expand All @@ -88,6 +93,7 @@ async def run_in_actor(
rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
Expand All @@ -106,6 +112,7 @@ async def run_in_actor(
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
)
# this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below.
Expand All @@ -131,7 +138,7 @@ def do_hard_kill(proc):
# send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT)

log.debug(f"Cancelling nursery")
log.debug("Cancelling nursery")
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
Expand Down Expand Up @@ -260,7 +267,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:

# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug(f"Waiting on all subactors to complete")
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()

# ria_nursery scope end
Expand Down Expand Up @@ -293,4 +300,4 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:

# ria_nursery scope end

log.debug(f"Nursery teardown complete")
log.debug("Nursery teardown complete")
70 changes: 70 additions & 0 deletions tractor/to_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
"""
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncGenerator,
Awaitable,
Union,
)

import trio


async def _invoke(
from_trio,
to_trio,
coro
) -> Union[AsyncGenerator, Awaitable]:
"""Await or stream awaiable object based on type into
``trio`` memory channel.
"""
async def stream_from_gen(c):
async for item in c:
to_trio.put_nowait(item)
to_trio.put_nowait

async def just_return(c):
to_trio.put_nowait(await c)

if inspect.isasyncgen(coro):
return await stream_from_gen(coro)
elif inspect.iscoroutine(coro):
return await coro


# TODO: make this some kind of tractor.to_asyncio.run()
async def run(
func: Callable,
qsize: int = 2**10,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize)
to_trio, from_aio = trio.open_memory_channel(qsize)

# allow target func to accept/stream results manually
kwargs['to_trio'] = to_trio
kwargs['from_trio'] = to_trio

coro = func(**kwargs)

# start the asyncio task we submitted from trio
# TODO: try out ``anyio`` asyncio based tg here
asyncio.create_task(_invoke(from_trio, to_trio, coro))

# determine return type async func vs. gen
if inspect.isasyncgen(coro):
await from_aio.get()
elif inspect.iscoroutine(coro):
async def gen():
async for tick in from_aio:
yield tuple(tick)

return gen()

0 comments on commit 15a3411

Please sign in to comment.