Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions data_generator/polygon_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import List

from vali_objects.vali_dataclasses.order import Order
from polygon.websocket import Market, EquityAgg, EquityTrade, CryptoTrade, ForexQuote, WebSocketClient, Feed
from polygon.websocket import Market, EquityAgg, EquityTrade, CryptoQuote, ForexQuote, WebSocketClient, Feed
from concurrent.futures import ThreadPoolExecutor, as_completed

from data_generator.base_data_service import BaseDataService, POLYGON_PROVIDER_NAME
Expand Down Expand Up @@ -204,7 +204,7 @@ def parse_price_for_forex(self, m, stats=None, is_ws=False) -> (float, float, fl
# stats['t_vlp'] = t_ms
return m.bid_price, m.ask_price, delta

async def handle_msg(self, msgs: List[ForexQuote | CryptoTrade | EquityAgg | EquityTrade]):
async def handle_msg(self, msgs: List[ForexQuote | CryptoQuote | EquityAgg | EquityTrade]):
"""
received message: CurrencyAgg(event_type='CAS', pair='USD/CHF', open=0.91313, close=0.91317, high=0.91318,
low=0.91313, volume=3, vwap=None, start_timestamp=1713273701000, end_timestamp=1713273702000,
Expand Down Expand Up @@ -248,12 +248,14 @@ def msg_to_price_sources(m, tp):
end_timestamp = None
open = close = vwap = high = low = m.price
elif tp.is_crypto:
if m.exchange != self.crypto_mapping['coinbase']:
#print(f"Skipping crypto trade from exchange {m.exchange} for {tp.trade_pair}")
return None, None
# if m.exchange_id != self.crypto_mapping['coinbase']:
# # print(f"Skipping crypto trade from exchange {m.exchange_id} for {tp.trade_pair}")
# return None, None
bid = m.bid_price
ask = m.ask_price
start_timestamp = round(m.received_timestamp, -3) # round to nearest second which allows aggresssive filtering via dup logic
end_timestamp = None
open = close = vwap = high = low = m.price
open = close = vwap = high = low = bid
else:
start_timestamp = m.start_timestamp
end_timestamp = m.end_timestamp - 1 # prioritize a new candle's open over a previous candle's close
Expand Down Expand Up @@ -306,7 +308,7 @@ def msg_to_price_sources(m, tp):
# print('received message:', m, type(m))
if isinstance(m, EquityAgg):
tp = self.symbol_to_trade_pair(m.symbol[2:]) # I:SPX -> SPX
elif isinstance(m, CryptoTrade):
elif isinstance(m, CryptoQuote):
tp = self.symbol_to_trade_pair(m.pair)
elif isinstance(m, ForexQuote):
tp = self.symbol_to_trade_pair(m.pair)
Expand Down Expand Up @@ -365,7 +367,7 @@ def _subscribe_websockets(self, tpc: TradePairCategory = None):
if tpc and tp.trade_pair_category != tpc:
continue
if tp.is_crypto:
symbol = "XT." + tp.trade_pair.replace('/', '-')
symbol = "XQ." + tp.trade_pair.replace('/', '-')
self.WEBSOCKET_OBJECTS[TradePairCategory.CRYPTO].subscribe(symbol)
subbed.append(symbol)
elif tp.is_forex:
Expand Down Expand Up @@ -498,7 +500,7 @@ def get_close_rest(

def trade_pair_to_polygon_ticker(self, trade_pair: TradePair):
if trade_pair.is_crypto:
return 'X:' + trade_pair.trade_pair_id
return 'X:' + trade_pair.trade_pair.replace('/', '-')
elif trade_pair.is_forex:
return 'C:' + trade_pair.trade_pair_id
elif trade_pair.is_indices:
Expand Down Expand Up @@ -819,6 +821,12 @@ def _get_filtered_forex_second_data():
else:
raise Exception(f'Invalid timespan {timespan}')
return ans
elif trade_pair.is_crypto and timespan == 'second':
ans, n = _get_filtered_forex_second_data()
if ans:
return ans
else:
return _fetch_raw_polygon_aggs()
else:
return _fetch_raw_polygon_aggs()

Expand Down Expand Up @@ -885,7 +893,7 @@ def get_quote(self, trade_pair: TradePair, processed_ms: int) -> (float, float,
if self.POLYGON_CLIENT is None:
self.instantiate_not_pickleable_objects()

if trade_pair.is_forex or trade_pair.is_equities:
if trade_pair.is_forex or trade_pair.is_equities or trade_pair.is_crypto:
polygon_ticker = self.trade_pair_to_polygon_ticker(trade_pair)
quotes = self.POLYGON_CLIENT.list_quotes(
ticker=polygon_ticker,
Expand All @@ -896,9 +904,7 @@ def get_quote(self, trade_pair: TradePair, processed_ms: int) -> (float, float,
)
for q in quotes:
return q.bid_price, q.ask_price, int(q.participant_timestamp/1_000_000) # convert ns back to ms
else:
# crypto
return 0, 0, 0
return 0, 0, 0

def get_currency_conversion(self, trade_pair: TradePair=None, base: str=None, quote: str=None) -> float:
"""
Expand Down Expand Up @@ -958,4 +964,4 @@ def get_currency_conversion(self, trade_pair: TradePair=None, base: str=None, qu
aggs.append(a)

assert 0, aggs
"""
"""
220 changes: 220 additions & 0 deletions data_generator/tardis_data_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import time
import traceback
from datetime import datetime, timedelta
from typing import List, Optional, Tuple

import bittensor as bt
from tardis_client import TardisClient, Channel

from data_generator.base_data_service import BaseDataService
from time_util.time_util import TimeUtil
from vali_objects.vali_config import TradePair, TradePairCategory
from vali_objects.vali_dataclasses.price_source import PriceSource

TARDIS_PROVIDER_NAME = "Tardis"

class TardisDataService(BaseDataService):
def __init__(self, api_key, ipc_manager=None):
self.init_time = time.time()
self._api_key = api_key
self.tardis_client = None

super().__init__(provider_name=TARDIS_PROVIDER_NAME, ipc_manager=ipc_manager)

# Mapping of trade pairs to exchange and symbol
self.trade_pair_to_exchange_map = {
TradePair.BTCUSD: ("binance", "btcusdt"),
TradePair.ETHUSD: ("binance", "ethusdt"),
TradePair.SOLUSD: ("binance", "solusdt"),
TradePair.XRPUSD: ("binance", "xrpusdt"),
TradePair.DOGEUSD: ("binance", "dogeusdt"),
}

def instantiate_not_pickleable_objects(self):
"""Instantiate the TardisClient object when needed."""
self.tardis_client = TardisClient(api_key=self._api_key)

async def get_market_data(self, trade_pair: TradePair, timestamp_ms: int) -> Optional[Tuple[float, float]]:
"""
Fetch historical market data (bid/ask) for the given trade pair at the specified timestamp.

Args:
trade_pair: The cryptocurrency trade pair
timestamp_ms: Timestamp in milliseconds

Returns:
Tuple containing (bid, ask) prices or None if data couldn't be fetched
"""
if self.tardis_client is None:
self.instantiate_not_pickleable_objects()

if trade_pair not in self.trade_pair_to_exchange_map:
bt.logging.warning(f"No exchange mapping found for {trade_pair.trade_pair}")
return None

exchange, symbol = self.trade_pair_to_exchange_map[trade_pair]

# Convert timestamp to datetime
timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000)

# Create a time window (15 seconds before and after the timestamp)
from_date = timestamp_dt - timedelta(seconds=15)
to_date = timestamp_dt + timedelta(seconds=15)

try:
# Get book snapshot data from tardis
messages = []

async for local_timestamp, message in self.tardis_client.replay(
exchange=exchange,
from_date=from_date.strftime('%Y-%m-%dT%H:%M:%S'),
to_date=to_date.strftime('%Y-%m-%dT%H:%M:%S'),
filters=[Channel(name="bookTicker", symbols=[symbol])]
):
messages.append(message)

if not messages:
bt.logging.warning(f"No data found for {trade_pair.trade_pair} at {from_date.strftime('%Y-%m-%d')}-{to_date.strftime('%Y-%m-%d')}")
return None

# # Find the closest message to our target timestamp
# closest_message = None
# min_time_diff = float('inf')
#
# for message in messages:
# if 'data' in message and 'b' in message['data'] and 'a' in message['data']:
# message_time = datetime.fromtimestamp(message['localTimestamp'] / 1000)
# time_diff = abs((message_time - timestamp_dt).total_seconds())
#
# if time_diff < min_time_diff:
# min_time_diff = time_diff
# closest_message = message
closest_message = messages[len(messages)//2]

if closest_message and 'data' in closest_message:
data = closest_message['data']

# Extract the best bid and ask
if 'b' in data and len(data['b']) > 0 and 'a' in data and len(data['a']) > 0:
best_bid = float(data['b'])
best_ask = float(data['a'])
return best_bid, best_ask

return None

except Exception as e:
bt.logging.error(f"Error fetching data from Tardis for {trade_pair.trade_pair}: {e}")
bt.logging.error(traceback.format_exc())
return None

def get_close_rest(self, trade_pair: TradePair, timestamp_ms: int = None) -> Optional[PriceSource]:
"""
Get a PriceSource object with bid/ask data for the given trade pair at the specified timestamp.
Used for REST API operations.

Args:
trade_pair: The trade pair to get data for
timestamp_ms: The timestamp in milliseconds to get data for (defaults to now)

Returns:
PriceSource object or None if data couldn't be fetched
"""
if not trade_pair.is_crypto:
return None

if timestamp_ms is None:
timestamp_ms = TimeUtil.now_in_millis()

# Run the async function to get the market data
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
bid_ask = loop.run_until_complete(self.get_market_data(trade_pair, timestamp_ms))
loop.close()
except Exception as e:
bt.logging.error(f"Error running async loop for {trade_pair.trade_pair}: {e}")
return None

if not bid_ask:
return None

bid, ask = bid_ask
mid_price = (bid + ask) / 2

now_ms = TimeUtil.now_in_millis()
return PriceSource(
source=f'{TARDIS_PROVIDER_NAME}_rest',
timespan_ms=0,
open=mid_price,
close=mid_price,
vwap=mid_price,
high=mid_price,
low=mid_price,
start_ms=timestamp_ms,
websocket=False,
lag_ms=now_ms - timestamp_ms,
bid=bid,
ask=ask
)

def get_closes_rest(self, trade_pairs: List[TradePair]) -> dict:
"""
Get close prices for multiple trade pairs at once.

Args:
trade_pairs: List of trade pairs to get data for

Returns:
Dictionary mapping trade pairs to their PriceSource objects
"""
result = {}
for trade_pair in trade_pairs:
if trade_pair.is_crypto:
price_source = self.get_close_rest(trade_pair)
if price_source:
result[trade_pair] = price_source
return result

def _create_websocket_client(self, tpc: TradePairCategory):
"""Not implemented for Tardis as we use REST API for historical data."""
pass

def _subscribe_websockets(self, tpc: TradePairCategory):
"""Not implemented for Tardis as we use REST API for historical data."""
pass

async def handle_msg(self, msg):
"""Not implemented for Tardis as we use REST API for historical data."""
pass

def get_quote(self, trade_pair: TradePair, processed_ms: int) -> Tuple[float, float, int]:
"""
Get bid and ask quotes for a trade pair at a specific time.

Args:
trade_pair: The trade pair to get quotes for
processed_ms: The timestamp in milliseconds

Returns:
Tuple of (bid, ask, timestamp_ms)
"""
if not trade_pair.is_crypto:
return 0, 0, 0

# Run the async function to get the market data
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
bid_ask = loop.run_until_complete(self.get_market_data(trade_pair, processed_ms))
loop.close()
except Exception as e:
bt.logging.error(f"Error running async loop for quote {trade_pair.trade_pair}: {e}")
return 0, 0, 0

if not bid_ask:
return 0, 0, 0

bid, ask = bid_ask
return bid, ask, processed_ms
8 changes: 4 additions & 4 deletions data_generator/tiingo_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,13 @@ def msg_to_price_sources(m:dict, tp:TradePair) -> PriceSource | None:

open = vwap = high = low = bid_price
elif tp.is_crypto:
mode, ticker, date_str, exchange, volume, price = data
mode, ticker, date_str, exchange, bid_size, bid_price, mid_price, ask_size, ask_price = data
start_timestamp = TimeUtil.parse_iso_to_ms(date_str)
start_timestamp = round(start_timestamp, -3) # round to nearest second which allows aggresssive filtering via dup logic
if mode != 'T':
print(f'Skipping crypto due to non-T mode {m}')
if mode != 'Q':
print(f'Skipping crypto due to non-Q mode {m}')
return None
open = vwap = high = low = price
open = vwap = high = low = bid_price

elif tp.is_equities:
(mode, date_str, timestamp_ns, ticker, bid_size, bid_price, mid_price, ask_price, ask_size, last_price,
Expand Down
Loading
Loading