Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fspd cluster #252

Merged
merged 12 commits into from
Jan 25, 2022
128 changes: 5 additions & 123 deletions piker/_cacheables.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,18 @@
Cacheing apis and toolz.

"""
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8

from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import (
asynccontextmanager,
)

import trio
from trio_typing import TaskStatus
import tractor
from tractor.trionics import maybe_open_context

from .brokers import get_brokermod
from .log import get_logger


T = TypeVar('T')
log = get_logger(__name__)


Expand Down Expand Up @@ -74,124 +62,18 @@ async def wrapper(*args):
return decorator


_cache: dict[str, 'Client'] = {} # noqa


class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.

'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None

@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> None:
async with mng as value:

_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))


@asynccontextmanager
async def maybe_open_ctx(

key: Hashable,
mngr: AsyncContextManager[T],

) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.

'''

await cache.lock.acquire()

ctx_key = id(mngr)

value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value

except KeyError:
log.info(f'Allocating new resource for {key}')

# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.

# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n

# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln

ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())

value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()

yield False, value

finally:
cache.users -= 1

if cache.lock.locked():
cache.lock.release()

if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')

# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()


@asynccontextmanager
async def open_cached_client(
brokername: str,
) -> 'Client': # noqa
'''Get a cached broker client from the current actor's local vars.
'''
Get a cached broker client from the current actor's local vars.

If one has not been setup do it and cache it.

'''
brokermod = get_brokermod(brokername)
async with maybe_open_ctx(
key=brokername,
mngr=brokermod.get_client(),
async with maybe_open_context(
acm_func=brokermod.get_client,
) as (cache_hit, client):
yield client
3 changes: 1 addition & 2 deletions piker/data/_sharedmem.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ def push(
return end

except ValueError as err:
# shoudl raise if diff detected
# should raise if diff detected
self.diff_err_fields(data)

raise err

def diff_err_fields(
Expand Down
43 changes: 27 additions & 16 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from typing import (
Any, Sequence,
AsyncIterator, Optional,
Awaitable, Callable,
)

import trio
Expand All @@ -37,7 +36,7 @@
from pydantic import BaseModel

from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx
from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
Expand Down Expand Up @@ -356,7 +355,10 @@ async def open_feed_bus(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
try:
bus._subscribers[symbol].remove(sub)
except ValueError:
log.warning(f'{sub} for {symbol} was already removed?')


@asynccontextmanager
Expand All @@ -368,12 +370,13 @@ async def open_sample_step_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_ctx(
key=delay_s,
mngr=portal.open_stream_from(
async with maybe_open_context(
acm_func=partial(
portal.open_stream_from,
iter_ohlc_periods,
delay_s=delay_s, # must be kwarg
),

kwargs={'delay_s': delay_s},
) as (cache_hit, istream):
if cache_hit:
# add a new broadcast subscription for the quote stream
Expand Down Expand Up @@ -520,7 +523,12 @@ async def open_feed(

) as (ctx, (init_msg, first_quotes)),

ctx.open_stream() as stream,
ctx.open_stream(
# XXX: be explicit about stream backpressure since we should
# **never** overrun on feeds being too fast, which will
# pretty much always happen with HFT XD
backpressure=True
) as stream,

):
# we can only read from shm
Expand Down Expand Up @@ -566,6 +574,7 @@ async def open_feed(

feed._max_sample_rate = max(ohlc_sample_rates)

# yield feed
try:
yield feed
finally:
Expand All @@ -590,17 +599,19 @@ async def maybe_open_feed(
'''
sym = symbols[0].lower()

async with maybe_open_ctx(
key=(brokername, sym),
mngr=open_feed(
brokername,
[sym],
loglevel=loglevel,
**kwargs,
),
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'brokername': brokername,
'symbols': [sym],
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
},
key=sym,
) as (cache_hit, feed):

if cache_hit:
print('USING CACHED FEED')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
Expand Down
5 changes: 4 additions & 1 deletion piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ async def fsp_compute(
profiler(f'{func_name} pushed history')
profiler.finish()

# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)

# setup a respawn handle
with trio.CancelScope() as cs:
tracker = TaskTracker(trio.Event(), cs)
await ctx.started(index)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index')

Expand Down
80 changes: 80 additions & 0 deletions piker/trionics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

'''
sugarz for trio/tractor conc peeps.

'''
from typing import AsyncContextManager
from typing import TypeVar
from contextlib import asynccontextmanager as acm

import trio


# A regular invariant generic type
T = TypeVar("T")


async def _enter_and_sleep(

mngr: AsyncContextManager[T],
to_yield: dict[int, T],
all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> T:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.

'''
async with mngr as value:
to_yield[id(mngr)] = value

if all(to_yield.values()):
all_entered.set()

# sleep until cancelled
await trio.sleep_forever()


@acm
async def async_enter_all(

*mngrs: list[AsyncContextManager[T]],

) -> tuple[T]:

to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()

async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
mngr,
to_yield,
all_entered,
)

# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())

# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()
7 changes: 4 additions & 3 deletions piker/ui/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ def _main(
piker_loglevel: str,
tractor_kwargs,
) -> None:
"""Sync entry point to start a chart app.
'''
Sync entry point to start a chart: a ``tractor`` + Qt runtime
entry point

"""
# ``tractor`` + Qt runtime entry point
'''
run_qtractor(
func=_async_main,
args=(sym, brokernames, piker_loglevel),
Expand Down
Loading