Skip to content

Commit

Permalink
Merge pull request #241 from goodboy/trionics
Browse files Browse the repository at this point in the history
Trionics
  • Loading branch information
goodboy authored Oct 27, 2021
2 parents be5582a + 9c13827 commit 5dbe8e4
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/debugging/fast_error_in_root_after_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def sleep(


async def open_ctx(
n: tractor._trionics.ActorNursery
n: tractor._supervise.ActorNursery
):

# spawn both actors
Expand Down
6 changes: 6 additions & 0 deletions newsfragments/241.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Introduce a new `sub-package`_ that exposes all our high(er) level trio primitives and goodies, most importantly:

- A new ``open_actor_cluster`` procedure is available for concurrently spawning a number of actors.
- A new ``gather_contexts`` procedure is available for concurrently entering a sequence of async context managers.

.. _sub-package: ../tractor/trionics
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
platforms=['linux', 'windows'],
packages=[
'tractor',
'tractor.trionics',
'tractor.testing',
],
install_requires=[
Expand Down
37 changes: 37 additions & 0 deletions tests/test_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import itertools

import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts

from conftest import tractor_test


MESSAGE = 'tractoring at full speed'


@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE


@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)
3 changes: 1 addition & 2 deletions tests/test_task_broadcasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import trio
from trio.lowlevel import current_task
import tractor
from tractor._broadcast import broadcast_receiver, Lagged
from tractor.trionics import broadcast_receiver, Lagged


@tractor.context
Expand Down Expand Up @@ -432,7 +432,6 @@ async def main():
tx, rx = trio.open_memory_channel(1)
brx = broadcast_receiver(rx, 1)
cs = trio.CancelScope()
sequence = list(range(3))

async def sub_and_recv():
with cs:
Expand Down
4 changes: 3 additions & 1 deletion tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from trio import MultiError

from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
Expand All @@ -13,7 +14,7 @@
context,
)
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery
from ._supervise import open_nursery
from ._state import current_actor, is_root_process
from ._exceptions import (
RemoteActorError,
Expand All @@ -39,6 +40,7 @@
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'Portal',
Expand Down
53 changes: 53 additions & 0 deletions tractor/_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'''
Actor cluster helpers.
'''
from __future__ import annotations

from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional

import trio
import tractor


@acm
async def open_actor_cluster(
modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[
dict[str, tractor.Portal],
None,
]:

portals: dict[str, tractor.Portal] = {}

if not names:
names = [f'worker_{i}' for i in range(count)]

if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')

async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid

async def _start(name: str) -> None:
name = f'{name}.{uid}'
portals[name] = await an.start_actor(
enable_modules=modules,
name=name,
)

for name in names:
n.start_soon(_start, name)

assert len(portals) == count
yield portals

await an.cancel(hard_kill=hard_kill)
3 changes: 3 additions & 0 deletions tractor/_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ async def acquire_debug_lock(
Grab root's debug lock on entry, release on exit.
'''
if not debug_mode():
return

async with trio.open_nursery() as n:
cs = await n.start(
wait_for_parent_stdin_hijack,
Expand Down
2 changes: 1 addition & 1 deletion tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver


log = get_logger(__name__)
Expand Down
File renamed without changes.
14 changes: 14 additions & 0 deletions tractor/trionics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'''
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import gather_contexts
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged


__all__ = [
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
]
File renamed without changes.
78 changes: 78 additions & 0 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'''
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm

import trio


# A regular invariant generic type
T = TypeVar("T")


async def _enter_and_wait(

mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
parent_exit: trio.Event,

) -> None:
'''
Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
unwrapped[id(mngr)] = value

if all(unwrapped.values()):
all_entered.set()

await parent_exit.wait()


@acm
async def gather_contexts(

mngrs: Sequence[AsyncContextManager[T]],

) -> AsyncGenerator[tuple[T, ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works.
'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()
parent_exit = trio.Event()

async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
)

# deliver control once all managers have started up
await all_entered.wait()

yield tuple(unwrapped.values())

# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()

0 comments on commit 5dbe8e4

Please sign in to comment.