-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trionics #241
Trionics #241
Changes from 23 commits
340ddba
680a841
4114eb1
1e917fd
79fb1d0
97006c9
8ba1031
9ddd757
c372367
76767a3
7d502ce
21afc69
2815f1c
73cbb23
6e6baf2
6f9229c
3130a04
c1089db
b7a4641
04895b9
87e3d32
b91adcf
71b8f9f
5040035
ebf080b
d0f5c7a
925af28
083b73a
c7f59bd
49dd230
6da7694
9c13827
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import itertools | ||
|
||
import trio | ||
import tractor | ||
from tractor import open_actor_cluster | ||
from tractor.trionics import async_enter_all | ||
|
||
from conftest import tractor_test | ||
|
||
|
||
MESSAGE = 'tractoring at full speed' | ||
|
||
|
||
@tractor.context | ||
async def worker(ctx: tractor.Context) -> None: | ||
await ctx.started() | ||
async with ctx.open_stream() as stream: | ||
async for msg in stream: | ||
# do something with msg | ||
print(msg) | ||
assert msg == MESSAGE | ||
|
||
|
||
@tractor_test | ||
async def test_streaming_to_actor_cluster() -> None: | ||
async with ( | ||
open_actor_cluster(modules=[__name__]) as portals, | ||
async_enter_all( | ||
mngrs=[p.open_context(worker) for p in portals.values()], | ||
) as contexts, | ||
async_enter_all( | ||
mngrs=[ctx[0].open_stream() for ctx in contexts], | ||
) as streams, | ||
): | ||
with trio.move_on_after(1): | ||
for stream in itertools.cycle(streams): | ||
await stream.send(MESSAGE) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
''' | ||
Actor cluster helpers. | ||
|
||
''' | ||
from __future__ import annotations | ||
|
||
from contextlib import asynccontextmanager as acm | ||
from multiprocessing import cpu_count | ||
from typing import AsyncGenerator, Optional | ||
|
||
import trio | ||
import tractor | ||
|
||
|
||
@acm | ||
async def open_actor_cluster( | ||
modules: list[str], | ||
count: int = cpu_count(), | ||
names: Optional[list[str]] = None, | ||
start_method: Optional[str] = None, | ||
hard_kill: bool = False, | ||
) -> AsyncGenerator[ | ||
list[str], | ||
dict[str, tractor.Portal] | ||
]: | ||
|
||
portals: dict[str, tractor.Portal] = {} | ||
|
||
if not names: | ||
names = [f'worker_{i}' for i in range(count)] | ||
|
||
if not len(names) == count: | ||
raise ValueError( | ||
'Number of names is {len(names)} but count it {count}') | ||
|
||
async with tractor.open_nursery(start_method=start_method) as an: | ||
async with trio.open_nursery() as n: | ||
uid = tractor.current_actor().uid | ||
|
||
async def _start(name: str) -> None: | ||
name = f'{name}.{uid}' | ||
portals[name] = await an.start_actor( | ||
enable_modules=modules, | ||
name=name, | ||
) | ||
|
||
for name in names: | ||
n.start_soon(_start, name) | ||
|
||
assert len(portals) == count | ||
yield portals | ||
|
||
await an.cancel(hard_kill=hard_kill) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
''' | ||
Sugary patterns for trio + tractor designs. | ||
|
||
''' | ||
from ._mngrs import async_enter_all | ||
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged | ||
|
||
|
||
__all__ = [ | ||
'async_enter_all', | ||
'broadcast_receiver', | ||
'BroadcastReceiver', | ||
'Lagged', | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
''' | ||
Async context manager primitives with hard ``trio``-aware semantics | ||
|
||
''' | ||
from typing import AsyncContextManager, AsyncGenerator | ||
from typing import TypeVar, Sequence | ||
from contextlib import asynccontextmanager as acm | ||
|
||
import trio | ||
|
||
|
||
# A regular invariant generic type | ||
T = TypeVar("T") | ||
|
||
|
||
async def _enter_and_wait( | ||
mngr: AsyncContextManager[T], | ||
unwrapped: dict[int, T], | ||
all_entered: trio.Event, | ||
) -> None: | ||
'''Open the async context manager deliver it's value | ||
to this task's spawner and sleep until cancelled. | ||
|
||
''' | ||
async with mngr as value: | ||
unwrapped[id(mngr)] = value | ||
|
||
if all(unwrapped.values()): | ||
all_entered.set() | ||
|
||
await trio.sleep_forever() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My one nitpick as a lurker is that with this cancel-to-exit strategy, all managers will exit via an "unclean" path, or in other words, their I'm not sure if that is actually important? It just gives me an itch. Could replace it with an event pretty easily. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh good point. I don't see any reason not to use an event and you're right about the mngrs themselves then getting expected inputs on different exit conditions. |
||
|
||
|
||
@acm | ||
async def async_enter_all( | ||
mngrs: Sequence[AsyncContextManager[T]], | ||
) -> AsyncGenerator[tuple[T, ...], None]: | ||
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) | ||
|
||
all_entered = trio.Event() | ||
|
||
async with trio.open_nursery() as n: | ||
for mngr in mngrs: | ||
n.start_soon( | ||
_enter_and_wait, | ||
mngr, | ||
unwrapped, | ||
all_entered, | ||
) | ||
|
||
# deliver control once all managers have started up | ||
await all_entered.wait() | ||
|
||
yield tuple(unwrapped.values()) | ||
|
||
n.cancel_scope.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be the answer to the process pool question right? looks great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More or less yah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the last question here is how do we export this?
Should it be top level
from tractor import open_actor_cluster
or maybe should be export via something liketractor.builtin
,tractor.extras
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also change
async_enter_all
tomass_aenter
, for example.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm,
<something_aenter()
is interesting, though i wonder if we should try to mimic whatcontextlib
stack apis offer? Not that any of them have this kind of interface per say.mass_
seems a bit foreign to me.I think we need to emphasize that the entering is done concurrently, not just that
__aenter__()
is being called (which is obviously async). I think in worker pool parlance this is done with something likepool.map()
but the difference here is that we're not running functions and collecting output - it's more of a setup/teardown type thing..There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about
enter_all_soon()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncExitStack
api for reference.