Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

We aint got zombie shields #188

Merged
merged 5 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions examples/multiple_streams_one_portal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import trio
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an example that shows the hang in practice.

The problem is if we have a nursery exit before receiving a ctr-c we'll be stuck in the shielded proc.wait() and have no way for the user to trigger graceful exit...

import tractor


log = tractor.log.get_logger('multiportal')


async def stream_data(seed=10):
log.info("Starting stream task")

for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler


async def stream_from_portal(p, consumed):

async for item in await p.run(stream_data):
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)


async def main():

async with tractor.open_nursery(loglevel='info') as an:

p = await an.start_actor('stream_boi', enable_modules=[__name__])

consumed = []

async with trio.open_nursery() as n:
for i in range(2):
n.start_soon(stream_from_portal, p, consumed)

# both streaming consumer tasks have completed and so we should
# have nothing in our list thanks to single threadedness
assert not consumed

await an.cancel()


if __name__ == '__main__':
trio.run(main)
19 changes: 11 additions & 8 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import inspect
import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional
from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType
import sys
import os
Expand Down Expand Up @@ -48,7 +48,9 @@ async def _invoke(
chan: Channel,
func: typing.Callable,
kwargs: Dict[str, Any],
task_status=trio.TASK_STATUS_IGNORED
task_status: TaskStatus[
Union[trio.CancelScope, BaseException]
] = trio.TASK_STATUS_IGNORED,
):
"""Invoke local func and deliver result(s) over provided channel.
"""
Expand Down Expand Up @@ -155,6 +157,7 @@ async def _invoke(
if cs is None:
# error is from above code not from rpc invocation
task_status.started(err)

finally:
# RPC task bookeeping
try:
Expand Down Expand Up @@ -199,7 +202,7 @@ def __init__(
self,
name: str,
*,
rpc_module_paths: List[str] = [],
enable_modules: List[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
Expand All @@ -219,14 +222,14 @@ def __init__(
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()

# always include debugging tools module
rpc_module_paths.append('tractor._debug')
enable_modules.append('tractor._debug')

mods = {}
for name in rpc_module_paths:
for name in enable_modules:
mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod)

self.rpc_module_paths = mods
self.enable_modules = mods
self._mods: Dict[str, ModuleType] = {}

# TODO: consider making this a dynamically defined
Expand Down Expand Up @@ -293,7 +296,7 @@ def load_modules(self) -> None:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])

for modpath, filepath in self.rpc_module_paths.items():
for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
Expand All @@ -317,7 +320,7 @@ def _get_rpc_func(self, ns, funcname):
if ns == '__main__':
msg = (
"\n\nMake sure you exposed the current module using:\n\n"
"ActorNursery.start_actor(<name>, rpc_module_paths="
"ActorNursery.start_actor(<name>, enable_modules="
"[__name__])"
)

Expand Down
4 changes: 2 additions & 2 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ async def run(
fn_name: Optional[str] = None,
**kwargs
) -> Any:
"""Submit a remote function to be scheduled and run by actor,
wrap and return its (stream of) result(s).
"""Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s).

This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
Expand Down
18 changes: 14 additions & 4 deletions tractor/_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async def open_root_actor(
# internal logging
loglevel: Optional[str] = None,

enable_modules: Optional[List] = None,
rpc_module_paths: Optional[List] = None,

) -> typing.Any:
Expand All @@ -58,7 +59,16 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True

# caps based rpc list
expose_modules = rpc_module_paths or []
enable_modules = enable_modules or []

if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)

if start_method is not None:
_spawn.try_set_start_method(start_method)
Expand All @@ -68,7 +78,7 @@ async def open_root_actor(

# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
expose_modules.append('tractor._debug')
enable_modules.append('tractor._debug')

elif debug_mode:
raise RuntimeError(
Expand Down Expand Up @@ -105,7 +115,7 @@ async def open_root_actor(
name or 'anonymous',
arbiter_addr=arbiter_addr,
loglevel=loglevel,
rpc_module_paths=expose_modules,
enable_modules=enable_modules,
)
host, port = (host, 0)

Expand All @@ -121,7 +131,7 @@ async def open_root_actor(
name or 'arbiter',
arbiter_addr=arbiter_addr,
loglevel=loglevel,
rpc_module_paths=expose_modules,
enable_modules=enable_modules,
)

try:
Expand Down
12 changes: 8 additions & 4 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async def new_proc(
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
Expand All @@ -273,10 +273,14 @@ async def new_proc(
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).

# TODO: No idea how we can enforce zombie
# reaping more stringently without the shield
# we used to have below...

# always "hard" join sub procs:
# no actor zombies allowed
with trio.CancelScope(shield=True):
await proc.wait()
# with trio.CancelScope(shield=True):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change.

Thoughts on this very welcome.

await proc.wait()
else:
# `multiprocessing`
assert _ctx
Expand All @@ -290,7 +294,7 @@ async def new_proc(
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(rpc_module_paths)
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
Expand Down
2 changes: 1 addition & 1 deletion tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def current_context():


def stream(func):
"""Mark an async function as a streaming routine.
"""Mark an async function as a streaming routine with ``@stream``.
"""
func._tractor_stream_function = True
sig = inspect.signature(func)
Expand Down
38 changes: 27 additions & 11 deletions tractor/_trionics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
"""
from functools import partial
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
from typing import Tuple, List, Dict, Optional
import typing
from contextlib import AsyncExitStack
import warnings

import trio
from async_generator import asynccontextmanager

from ._state import current_actor, is_root_process, is_main_process
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
Expand Down Expand Up @@ -56,6 +57,7 @@ async def start_actor(
*,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: List[str] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
) -> Portal:
Expand All @@ -65,10 +67,21 @@ async def start_actor(
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False

enable_modules = enable_modules or []

if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)

subactor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [],
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
Expand Down Expand Up @@ -221,7 +234,6 @@ async def open_nursery(
# mark us for teardown on exit
implicit_runtime = True


# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}

Expand Down Expand Up @@ -263,18 +275,22 @@ async def open_nursery(
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt) or (
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
Expand Down
30 changes: 19 additions & 11 deletions tractor/msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
"""
import inspect
import typing
from typing import Dict, Any, Set, Union, Callable
from typing import Dict, Any, Set, Callable
from functools import partial
from async_generator import aclosing

import trio
import wrapt

from .log import get_logger
from . import current_actor
from ._streaming import Context

__all__ = ['pub']
Expand Down Expand Up @@ -91,6 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx):


_pub_state: Dict[str, dict] = {}
_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {}


def pub(
Expand Down Expand Up @@ -178,22 +178,22 @@ async def pub_service(get_topics):
subscribers. If you are ok to have a new task running for every call
to ``pub_service()`` then probably don't need this.
"""
global _pub_state
global _pubtask2lock

# handle the decorator not called with () case
if wrapped is None:
return partial(pub, tasks=tasks)

task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {
None: trio.StrictFIFOLock()}
task2lock: Dict[str, trio.StrictFIFOLock] = {}

for name in tasks:
task2lock[name] = trio.StrictFIFOLock()

@wrapt.decorator
async def wrapper(agen, instance, args, kwargs):
# this is used to extract arguments properly as per
# the `wrapt` docs

# XXX: this is used to extract arguments properly as per the
# `wrapt` docs
async def _execute(
ctx: Context,
topics: Set[str],
Expand All @@ -203,14 +203,22 @@ async def _execute(
packetizer: Callable = None,
**kwargs,
):
if tasks and task_name is None:
if task_name is None:
task_name = trio.lowlevel.current_task().name

if tasks and task_name not in tasks:
raise TypeError(
f"{agen} must be called with a `task_name` named "
f"argument with a falue from {tasks}")
f"argument with a value from {tasks}")

elif not tasks and not task2lock:
# add a default root-task lock if none defined
task2lock[task_name] = trio.StrictFIFOLock()

_pubtask2lock.update(task2lock)

topics = set(topics)
lockmap = _pub_state.setdefault('_pubtask2lock', task2lock)
lock = lockmap[task_name]
lock = _pubtask2lock[task_name]

all_subs = _pub_state.setdefault('_subs', {})
topics2ctxs = all_subs.setdefault(task_name, {})
Expand Down