Skip to content

Commit

Permalink
Merge pull request #246 from overclockworked64/patch-async-enter-all
Browse files Browse the repository at this point in the history
Patch async enter all (addresses #242)
  • Loading branch information
goodboy authored Oct 22, 2021
2 parents e5b3eb1 + 8d9ad6b commit f4ba805
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 37 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
platforms=['linux', 'windows'],
packages=[
'tractor',
'tractor.trionics',
'tractor.testing',
],
install_requires=[
Expand Down
37 changes: 37 additions & 0 deletions tests/test_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import itertools

import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all

from conftest import tractor_test


MESSAGE = 'tractoring at full speed'


@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE


@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)
2 changes: 2 additions & 0 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from trio import MultiError

from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
Expand Down Expand Up @@ -39,6 +40,7 @@
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'Portal',
Expand Down
31 changes: 17 additions & 14 deletions tractor/_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Actor cluster helpers.
'''
from __future__ import annotations

from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional
Expand All @@ -12,39 +14,40 @@

@acm
async def open_actor_cluster(

modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,

start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[
list[str],
dict[str, tractor.Portal]
]:

portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid

if not names:
suffix = '_'.join(uid)
names = [f'worker_{i}.' + suffix for i in range(count)]
names = [f'worker_{i}' for i in range(count)]

if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')

async with tractor.open_nursery() as an:
async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
for index, key in zip(range(count), names):
uid = tractor.current_actor().uid

async def start(i) -> None:
key = f'worker_{i}.' + '_'.join(uid)
portals[key] = await an.start_actor(
enable_modules=modules,
name=key,
)
async def _start(name: str) -> None:
name = f'{name}.{uid}'
portals[name] = await an.start_actor(
enable_modules=modules,
name=name,
)

n.start_soon(start, index)
for name in names:
n.start_soon(_start, name)

assert len(portals) == count
yield portals

await an.cancel(hard_kill=hard_kill)
38 changes: 15 additions & 23 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager
from typing import TypeVar
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm

import trio
Expand All @@ -13,52 +13,44 @@
T = TypeVar("T")


async def _enter_and_sleep(

async def _enter_and_wait(
mngr: AsyncContextManager[T],
to_yield: dict[int, T],
unwrapped: dict[int, T],
all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> T:
) -> None:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
to_yield[id(mngr)] = value
unwrapped[id(mngr)] = value

if all(to_yield.values()):
if all(unwrapped.values()):
all_entered.set()

# sleep until cancelled
await trio.sleep_forever()


@acm
async def async_enter_all(

*mngrs: tuple[AsyncContextManager[T]],

) -> tuple[T]:

to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]:
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()

async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
_enter_and_wait,
mngr,
to_yield,
unwrapped,
all_entered,
)

# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())

# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()
yield tuple(unwrapped.values())

n.cancel_scope.cancel()

0 comments on commit f4ba805

Please sign in to comment.