-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trionics #241
Merged
Trionics #241
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
340ddba
Rename the nursery module to `_supervise`
goodboy 680a841
Start `trionics` sub-pkg with `async_enter_all()`
goodboy 4114eb1
Move broadcast channel parts into trionics
goodboy 1e917fd
Add an async actor cluster spawner prototype
goodboy 79fb1d0
Fix top level nursery import
goodboy 97006c9
Expose `Lagged` for broadcasting
goodboy 8ba1031
Fix type path to new `_supervise` mod
goodboy 9ddd757
Lul, fix everything for cluster helper
goodboy c372367
Fix *args-like type annot
goodboy 76767a3
Add 'trio.trionics' to setup.py
overclockworked64 7d502ce
Add 'open_actor_cluster' to __all__
overclockworked64 21afc69
Postpone evaluation of annotations
overclockworked64 2815f1c
Make 'async_enter_all' take a teardown trigger which '_enter_and_wait…
overclockworked64 73cbb23
Avoid RuntimeError by not using current_actor's uid
overclockworked64 6e6baf2
Make sure the ID is a str
overclockworked64 6f9229c
Cancel nursery
overclockworked64 3130a04
Rename a variable and fix type annotations
overclockworked64 c1089db
Add a clustering test
overclockworked64 b7a4641
Allow specifying start_method and hard_kill
overclockworked64 04895b9
Get rid of dumb random uid and use current actor's uid
overclockworked64 87e3d32
Get rid of external teardown trigger because #245 resolves the problem
overclockworked64 b91adcf
Get rid of external teardown trigger
overclockworked64 71b8f9f
Merge pull request #252 from goodboy/246_facepalm_backup
goodboy 5040035
Fix type annotations
overclockworked64 ebf080b
Merge pull request #253 from overclockworked64/fix-type-annotation
goodboy d0f5c7a
Change to `gather_contexts()`, use event for graceful exit
goodboy 925af28
Merge pull request #254 from goodboy/graceful_gather
goodboy 083b73a
Test: don't grab debug lock if not in mode
goodboy c7f59bd
Add a news fragment
overclockworked64 49dd230
Add a newline
overclockworked64 6da7694
Fix the syntax and point to the new package
overclockworked64 9c13827
Merge pull request #256 from overclockworked64/241-news-fragment
goodboy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
Introduce a new `sub-package`_ that exposes all our high(er) level trio primitives and goodies, most importantly: | ||
|
||
- A new ``open_actor_cluster`` procedure is available for concurrently spawning a number of actors. | ||
- A new ``gather_contexts`` procedure is available for concurrently entering a sequence of async context managers. | ||
|
||
.. _sub-package: ../tractor/trionics |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import itertools | ||
|
||
import trio | ||
import tractor | ||
from tractor import open_actor_cluster | ||
from tractor.trionics import gather_contexts | ||
|
||
from conftest import tractor_test | ||
|
||
|
||
MESSAGE = 'tractoring at full speed' | ||
|
||
|
||
@tractor.context | ||
async def worker(ctx: tractor.Context) -> None: | ||
await ctx.started() | ||
async with ctx.open_stream() as stream: | ||
async for msg in stream: | ||
# do something with msg | ||
print(msg) | ||
assert msg == MESSAGE | ||
|
||
|
||
@tractor_test | ||
async def test_streaming_to_actor_cluster() -> None: | ||
async with ( | ||
open_actor_cluster(modules=[__name__]) as portals, | ||
gather_contexts( | ||
mngrs=[p.open_context(worker) for p in portals.values()], | ||
) as contexts, | ||
gather_contexts( | ||
mngrs=[ctx[0].open_stream() for ctx in contexts], | ||
) as streams, | ||
): | ||
with trio.move_on_after(1): | ||
for stream in itertools.cycle(streams): | ||
await stream.send(MESSAGE) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
''' | ||
Actor cluster helpers. | ||
|
||
''' | ||
from __future__ import annotations | ||
|
||
from contextlib import asynccontextmanager as acm | ||
from multiprocessing import cpu_count | ||
from typing import AsyncGenerator, Optional | ||
|
||
import trio | ||
import tractor | ||
|
||
|
||
@acm | ||
async def open_actor_cluster( | ||
modules: list[str], | ||
count: int = cpu_count(), | ||
names: Optional[list[str]] = None, | ||
start_method: Optional[str] = None, | ||
hard_kill: bool = False, | ||
) -> AsyncGenerator[ | ||
dict[str, tractor.Portal], | ||
None, | ||
]: | ||
|
||
portals: dict[str, tractor.Portal] = {} | ||
|
||
if not names: | ||
names = [f'worker_{i}' for i in range(count)] | ||
|
||
if not len(names) == count: | ||
raise ValueError( | ||
'Number of names is {len(names)} but count it {count}') | ||
|
||
async with tractor.open_nursery(start_method=start_method) as an: | ||
async with trio.open_nursery() as n: | ||
uid = tractor.current_actor().uid | ||
|
||
async def _start(name: str) -> None: | ||
name = f'{name}.{uid}' | ||
portals[name] = await an.start_actor( | ||
enable_modules=modules, | ||
name=name, | ||
) | ||
|
||
for name in names: | ||
n.start_soon(_start, name) | ||
|
||
assert len(portals) == count | ||
yield portals | ||
|
||
await an.cancel(hard_kill=hard_kill) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
''' | ||
Sugary patterns for trio + tractor designs. | ||
|
||
''' | ||
from ._mngrs import gather_contexts | ||
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged | ||
|
||
|
||
__all__ = [ | ||
'gather_contexts', | ||
'broadcast_receiver', | ||
'BroadcastReceiver', | ||
'Lagged', | ||
] |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
''' | ||
Async context manager primitives with hard ``trio``-aware semantics | ||
|
||
''' | ||
from typing import AsyncContextManager, AsyncGenerator | ||
from typing import TypeVar, Sequence | ||
from contextlib import asynccontextmanager as acm | ||
|
||
import trio | ||
|
||
|
||
# A regular invariant generic type | ||
T = TypeVar("T") | ||
|
||
|
||
async def _enter_and_wait( | ||
|
||
mngr: AsyncContextManager[T], | ||
unwrapped: dict[int, T], | ||
all_entered: trio.Event, | ||
parent_exit: trio.Event, | ||
|
||
) -> None: | ||
''' | ||
Open the async context manager deliver it's value | ||
to this task's spawner and sleep until cancelled. | ||
|
||
''' | ||
async with mngr as value: | ||
unwrapped[id(mngr)] = value | ||
|
||
if all(unwrapped.values()): | ||
all_entered.set() | ||
|
||
await parent_exit.wait() | ||
|
||
|
||
@acm | ||
async def gather_contexts( | ||
|
||
mngrs: Sequence[AsyncContextManager[T]], | ||
|
||
) -> AsyncGenerator[tuple[T, ...], None]: | ||
''' | ||
Concurrently enter a sequence of async context managers, each in | ||
a separate ``trio`` task and deliver the unwrapped values in the | ||
same order once all managers have entered. On exit all contexts are | ||
subsequently and concurrently exited. | ||
|
||
This function is somewhat similar to common usage of | ||
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in | ||
combo with ``asyncio.gather()`` except the managers are concurrently | ||
entered and exited cancellation just works. | ||
|
||
''' | ||
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) | ||
|
||
all_entered = trio.Event() | ||
parent_exit = trio.Event() | ||
|
||
async with trio.open_nursery() as n: | ||
for mngr in mngrs: | ||
n.start_soon( | ||
_enter_and_wait, | ||
mngr, | ||
unwrapped, | ||
all_entered, | ||
parent_exit, | ||
) | ||
|
||
# deliver control once all managers have started up | ||
await all_entered.wait() | ||
|
||
yield tuple(unwrapped.values()) | ||
|
||
# we don't need a try/finally since cancellation will be triggered | ||
# by the surrounding nursery on error. | ||
parent_exit.set() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be the answer to the process pool question right? looks great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More or less yah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the last question here is how do we export this?
Should it be top level
from tractor import open_actor_cluster
or maybe should be export via something liketractor.builtin
,tractor.extras
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also change
async_enter_all
tomass_aenter
, for example.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm,
<something_aenter()
is interesting, though i wonder if we should try to mimic whatcontextlib
stack apis offer? Not that any of them have this kind of interface per say.mass_
seems a bit foreign to me.I think we need to emphasize that the entering is done concurrently, not just that
__aenter__()
is being called (which is obviously async). I think in worker pool parlance this is done with something likepool.map()
but the difference here is that we're not running functions and collecting output - it's more of a setup/teardown type thing..There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about
enter_all_soon()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncExitStack
api for reference.