Skip to content

Commit

Permalink
Add maybe_open_context() an actor wide task-resource cache
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Dec 3, 2021
1 parent 3f6099f commit ef5162a
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 4 deletions.
12 changes: 10 additions & 2 deletions tractor/trionics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import gather_contexts
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
from ._mngrs import (
gather_contexts,
maybe_open_context,
)
from ._broadcast import (
broadcast_receiver,
BroadcastReceiver,
Lagged,
)


__all__ = [
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
'maybe_open_context',
]
125 changes: 123 additions & 2 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm
from typing import (
Any,
AsyncContextManager,
AsyncGenerator,
Hashable,
Optional,
Sequence,
TypeVar,
)

import trio
from trio_typing import TaskStatus

from ..log import get_logger
from .._state import current_actor


log = get_logger(__name__)

# A regular invariant generic type
T = TypeVar("T")
Expand Down Expand Up @@ -76,3 +89,111 @@ async def gather_contexts(
# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()


# Per actor task caching helpers.
# Further potential examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8

class cache:
'''
Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None

@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> None:
async with mng as value:

_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))


@acm
async def maybe_open_context(

key: Hashable,
mngr: AsyncContextManager[T],

) -> (bool, T):
'''
Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''
await cache.lock.acquire()

ctx_key = id(mngr)

value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value

except KeyError:
log.info(f'Allocating new resource for {key}')

# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.

# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = current_actor()._service_n

# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln

ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())

value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()

yield False, value

finally:
cache.users -= 1

if cache.lock.locked():
cache.lock.release()

if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.info(f'De-allocating resource for {key}')

# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()

0 comments on commit ef5162a

Please sign in to comment.