Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Subgraphs with Adapters #379

Closed
Roh-codeur opened this issue Nov 7, 2024 · 12 comments
Closed

Dynamic Subgraphs with Adapters #379

Roh-codeur opened this issue Nov 7, 2024 · 12 comments
Labels
adapter: general Issues and PRs related to input/output adapters in general type: feature Issues and PRs related to new features

Comments

@Roh-codeur
Copy link

Hi, can you please help with the below please, ta!

Describe the bug
I recall that AdapterManager are not yet supported in DynamicGraphManager.

To Reproduce
I am trying to combine the examples: https://github.com/Point72/csp/blob/main/examples/06_advanced/e1_dynamic.py and https://github.com/Point72/csp/blob/main/examples/04_writing_adapters/e5_adaptermanager_pushinput.py as below:

  1. Create adapter manager
  2. Inside a sub-graph, I am hoping I can use the updates from the adapter
from datetime import datetime, timedelta

import csp
from csp import ts


class Order(csp.Struct):
    symbol: str
    size: int
    price: float


@csp.graph
def process_symbol(symbol: str, order: ts[Order], initial_order: Order, timer: ts[MyData], scalar: str) -> ts[int]:
    print("Starting sub-graph to process symbol ", symbol, " with initial order: ", initial_order, " scalar: ", scalar)

    csp.print(symbol + " orders", order)
    csp.print(symbol + " timer", timer)

    cum_size = csp.accum(timer.value)
    return cum_size


@csp.node
def process_results(x: {ts[str]: ts[int]}):
    if csp.ticked(x):
        print(csp.now(), "cum_sizes:", dict(x.tickeditems()))


@csp.graph
def main_graph():
    # We have a stream of incoming orders to deal with, we dont know the symbols up front
    adapter_manager = MyAdapterManager(timedelta(seconds=0.75))
    symbols = ["AAPL", "IBM", "TSLA", "GS", "JPM"]

    orders = csp.curve(
        Order,
        [
            (timedelta(seconds=0), Order(symbol="AAPL", price=135, size=100)),
            (timedelta(seconds=1), Order(symbol="FB", price=350, size=-200)),
            (timedelta(seconds=2), Order(symbol="GME", price=210, size=1000)),
            (timedelta(seconds=3), Order(symbol="AAPL", price=138, size=-100)),
            (timedelta(seconds=4), Order(symbol="FB", price=330, size=100)),
            (timedelta(seconds=5), Order(symbol="AMC", price=57, size=400)),
            (timedelta(seconds=6), Order(symbol="GME", price=200, size=800)),
        ],
    )

    # Get a dynamic basket keys by symbol
    trigger = csp.dynamic_demultiplex(orders, orders.symbol)

    some_ts = csp.count(csp.timer(timedelta(seconds=1)))
    some_scalar = "howdy"

    # dynamic graphs
    cum_sizes = csp.dynamic(
        trigger,
        process_symbol,
        csp.snapkey(),  # csp.snapkey() provides the key that triggers a new dynamic graph as a scalar argument
        csp.attach(),  # csp.attach() will pass the corresponding timeseries of the key for the graph instance
        csp.snap(
            orders
        ),  # csp.snap will provide the current value of the given timeseries at the time of dynamic graph instantiation
        adapter_manager.subscribe("GS"),  # regular time series can be passed along, which will be shared across all instances
        some_scalar,  # regular scalar values can be passed as arguments to the sub-graph as well
    )

    # cum_sizes is a dynamic basket of results, keyed by the trigger keys
    process_results(cum_sizes)


def main():
    csp.run(main_graph, starttime=datetime.utcnow().replace(microsecond=0), endtime=timedelta(seconds=10))


if __name__ == "__main__":
    main()

Expected behavior
I was hoping I can use the adapter this way with dynamic graph manager. in the future, the adapter would subscribe to all symbols and I will be able to filter for the pertinent symbol

Error Message
No error message, just no output from

csp.print(symbol + " timer", timer) 

Runtime Environment

>>> import sys, csp; print(csp.__version__); print(sys.version); print(sys.platform)
0.0.5
3.10.14 | packaged by Anaconda, Inc. | (main, May  6 2024, 19:44:50) [MSC v.1916 64 bit (AMD64)]


@timkpaine timkpaine added the tag: question Questions about use, potential features, or improvements label Nov 7, 2024
@timkpaine
Copy link
Member

Adapters are not supported with dynamic, this is part of the motivation for the new pattern in #277

@Roh-codeur
Copy link
Author

thanks @timkpaine. can you please suggest a pattern I could follow for the below please

  1. on startup, I have a list of tickers
  2. I create a sub-graph for each of these tickers
  3. I would like to subscribe to market data for these tickers and feed to sub-graph

@timkpaine
Copy link
Member

timkpaine commented Nov 7, 2024

In contrast with some other csp developers, I tend to be a "fat pipe" kind of guy. Here is how I would do it (and how I do do it in other places):

I will note that if you know all tickers on startup, you dont need to use dynamic, you could e.g.:

@csp.graph
def graph(tickers: List[str]):
    subscriptions = {}
    for ticker in tickers:
        subscriptions[ticker] = MarketData.subscribe(ticker)
    ...

@Roh-codeur
Copy link
Author

thanks mate. :

  1. Looks like the PR for that Issue is being worked on. thanks for that. Add dynamic endpoints to WebsocketAdapter #378. do you think I can do something before its merged?
  2. I am working with Futures, so, I do indeed know the list of contracts. however, at a given point in time, only a set of contracts are active, in backtesting, I am only trying to create sub-graphs for a list of contracts active at graph time. hence my approach towards dynamic graphs. is there another way to achieve this please?

ta

@timkpaine
Copy link
Member

  1. Looks like the PR for that Issue is being worked on. thanks for that. [WIP] Add dynamic endpoints to WebsocketAdapter #378. do you think I can do something before its merged?

We are starting with websockets and http adapter as a POC of the new structure, to be used as a pattern for the forthcoming redis adapter (and then will likely go back and do for other types of adapters like kafka).

  1. I am working with Futures, so, I do indeed know the list of contracts. however, at a given point in time, only a set of contracts are active, in backtesting, I am only trying to create sub-graphs for a list of contracts active at graph time. hence my approach towards dynamic graphs. is there another way to achieve this please?

I don't think there is a timeline for dynamic adapters right now

@Roh-codeur
Copy link
Author

Roh-codeur commented Nov 7, 2024

got it, thanks mate. I am thinking will start off with static sub-graphs for now, for all given contracts. to minimize the graph size(num of sub-graphs), will run shorter time periods. shouldnt be an issue when realtime=True.

Meanwhile, look forward to dynamic adapter changes. thanks again for looking into that. if you do think of a potential solution, please do let me know

ta

@timkpaine
Copy link
Member

I'm going to keep open for now as I realize we don't have an external ticket for dynamic adapters

@timkpaine timkpaine reopened this Nov 7, 2024
@timkpaine timkpaine added type: feature Issues and PRs related to new features adapter: general Issues and PRs related to input/output adapters in general and removed tag: question Questions about use, potential features, or improvements labels Nov 7, 2024
@timkpaine timkpaine changed the title Adapter with Dynamic Manager Dynamic Subgraphs with Adapters Nov 7, 2024
@robambalu
Copy link
Collaborator

This particular example looks like it should work though, since the adapter is created and subscribed to outside of the dynamic graph. The limitation at the moment ( perhaps forever ) is that you cannot instantiate adapter managers in a dynamic graph, but I dont see that happening in this example

@robambalu
Copy link
Collaborator

If you can include the code for MyAdapterManager and MyData and I can to run this and see whats happening

@Roh-codeur
Copy link
Author

@robambalu hey mate, sure, I am trying to combine examples in the examples folder:

https://github.com/Point72/csp/blob/main/examples/06_advanced/e1_dynamic.py and https://github.com/Point72/csp/blob/main/examples/04_writing_adapters/e5_adaptermanager_pushinput.py

"""
This example introduces the concept of an AdapterManager for realtime data. AdapterManagers are constructs that are used
when you have a shared input or output resources (ie single CSV / Parquet file, some pub/sub session, etc)
that you want to connect to once, but provide data to/from many input/output adapters (aka time series)
"""

import random
import threading
import time
from datetime import datetime, timedelta

import csp
from csp import ts
from csp.impl.adaptermanager import AdapterManagerImpl
from csp.impl.pushadapter import PushInputAdapter
from csp.impl.wiring import py_push_adapter_def


class MyData(csp.Struct):
    symbol: str
    value: int


# This object represents our AdapterManager at graph time. It describes the manager's properties
# and will be used to create the actual impl when its time to build the engine
class MyAdapterManager:
    def __init__(self, interval: timedelta):
        """
        Normally one would pass properties of the manager here, ie filename,
        message bus, etc
        """
        print("MyAdapterManager::__init__")
        self._interval = interval

    def subscribe(self, symbol, push_mode=csp.PushMode.NON_COLLAPSING):
        """User facing API to subscribe to a timeseries stream from this adapter manager"""
        # This will return a graph-time timeseries edge representing and edge from this
        # adapter manager for the given symbol / arguments
        return MyPushAdapter(self, symbol, push_mode=push_mode)

    def _create(self, engine, memo):
        """This method will get called at engine build time, at which point the graph time manager representation
        will create the actual impl that will be used for runtime
        """
        print("MyAdapterManager::_create")
        # Normally you would pass the arguments down into the impl here
        return MyAdapterManagerImpl(engine, self._interval)


# This is the actual manager impl that will be created and executed during runtime
class MyAdapterManagerImpl(AdapterManagerImpl):
    def __init__(self, engine, interval):
        print("MyAdapterManagerImpl::__init__")
        super().__init__(engine)

        # These are just used to simulate a data source
        self._interval = interval
        self._counter = 0

        # We will keep track of requested input adapters here
        self._inputs = {}

        # Out driving thread, all  realtime adapters will need a separate thread of execution that
        # drives data into the engine thread
        self._running = False
        self._thread = None

    def start(self, starttime, endtime):
        """start will get called at the start of the engine run. At this point
        one would start up the realtime data source / spawn the driving thread(s) and
         subscribe to the needed data"""
        print("MyAdapterManagerImpl::start")
        self._running = True
        self._thread = threading.Thread(target=self._run)
        self._thread.start()

    def stop(self):
        """This will be called at the end of the engine run, at which point resources should be
        closed and cleaned up"""
        print("MyAdapterManagerImpl::stop")
        if self._running:
            self._running = False
            self._thread.join()

    def register_input_adapter(self, symbol, adapter):
        """Actual PushInputAdapters will self register when they are created as part of the engine
        This is the place we gather all requested input adapters and their properties
        """
        if symbol not in self._inputs:
            self._inputs[symbol] = []
        # Keep a list of adapters by key in case we get duplicate adapters ( should be memoized in reality )
        self._inputs[symbol].append(adapter)

    def process_next_sim_timeslice(self, now):
        """This method is only used by simulated / historical adapters, for realtime we just return None"""
        return None

    def _run(self):
        """Our driving thread, in reality this will be reacting to external events, parsing the data and
        pushing it into the respective adapter
        """
        symbols = list(self._inputs.keys())
        while self._running:
            # Lets pick a random symbol from the requested symbols
            symbol = symbols[random.randint(0, len(symbols) - 1)]
            adapters = self._inputs[symbol]
            data = MyData(symbol=symbol, value=self._counter)
            self._counter += 1
            for adapter in adapters:
                adapter.push_tick(data)

            time.sleep(self._interval.total_seconds())


# The Impl object is created at runtime when the graph is converted into the runtime engine
# it does not exist at graph building time. a managed sim adapter impl will get the
# adapter manager runtime impl as its first argument
class MyPushAdapterImpl(PushInputAdapter):
    def __init__(self, manager_impl, symbol):
        print(f"MyPushAdapterImpl::__init__ {symbol}")
        manager_impl.register_input_adapter(symbol, self)
        super().__init__()


MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[MyData], MyAdapterManager, symbol=str)

@robambalu
Copy link
Collaborator

Thanks. As expected this is all working fine, the only reason you aren’t seeing the ticks is because you are running a push adapter here but you are running the engine in simulation mode.
If you just pass realtime=True to your csp.run you will see the ticks.
More information on push/pull adapters can be found here:
https://github.com/Point72/csp/wiki/Write-Historical-Input-Adapters
https://github.com/Point72/csp/wiki/Write-Realtime-Input-Adapters

@Roh-codeur
Copy link
Author

got it, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
adapter: general Issues and PRs related to input/output adapters in general type: feature Issues and PRs related to new features
Projects
None yet
Development

No branches or pull requests

3 participants