diff --git a/CHANGES.md b/CHANGES.md index 673bbe7a6..653a87874 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ * Bugfix: Handle empty nextFundingRate in OKX * Bugfix: Handle null next_funding_time and estimated_rate in HuobiSwap funding * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade + * Feature: Bybit spot support + * Update: Bybit migrate to API V5 for public streams ### 2.4.0 (2024-01-07) * Update: Fix tests diff --git a/cryptofeed/exchanges/bybit.py b/cryptofeed/exchanges/bybit.py index b52e4ac72..70cf57753 100644 --- a/cryptofeed/exchanges/bybit.py +++ b/cryptofeed/exchanges/bybit.py @@ -12,14 +12,14 @@ from decimal import Decimal from typing import Dict, Tuple, Union from datetime import datetime as dt +import re from yapic import json from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint -from cryptofeed.defines import BID, ASK, BUY, BYBIT, CANCELLED, CANCELLING, CANDLES, FAILED, FILLED, FUNDING, L2_BOOK, LIMIT, LIQUIDATIONS, MAKER, MARKET, OPEN, PARTIAL, SELL, SUBMITTING, TAKER, TRADES, OPEN_INTEREST, INDEX, ORDER_INFO, FILLS, FUTURES, PERPETUAL +from cryptofeed.defines import BID, ASK, BUY, BYBIT, CANCELLED, CANCELLING, CANDLES, FAILED, FILLED, FUNDING, L2_BOOK, LIMIT, LIQUIDATIONS, MAKER, MARKET, OPEN, PARTIAL, SELL, SUBMITTING, TAKER, TRADES, OPEN_INTEREST, INDEX, ORDER_INFO, FILLS, FUTURES, PERPETUAL, SPOT, TICKER from cryptofeed.feed import Feed -from cryptofeed.types import OrderBook, Trade, Index, OpenInterest, Funding, OrderInfo, Fill, Candle, Liquidation - +from cryptofeed.types import OrderBook, Trade, Index, OpenInterest, Funding, OrderInfo, Fill, Candle, Liquidation, Ticker LOG = logging.getLogger('feedhandler') @@ -27,25 +27,33 @@ class Bybit(Feed): id = BYBIT websocket_channels = { - L2_BOOK: 'orderBook_200.100ms', - TRADES: 'trade', + L2_BOOK: '', # Assigned in self.subscribe + TRADES: 'publicTrade', FILLS: 'execution', ORDER_INFO: 'order', - INDEX: 'instrument_info.100ms', - OPEN_INTEREST: 'instrument_info.100ms', - FUNDING: 'instrument_info.100ms', - CANDLES: 'klineV2', - LIQUIDATIONS: 'liquidation' + INDEX: 'index', + OPEN_INTEREST: 'open_interest', + FUNDING: 'funding', + CANDLES: 'kline', + LIQUIDATIONS: 'liquidation', + TICKER: 'tickers' } websocket_endpoints = [ - WebsocketEndpoint('wss://stream.bybit.com/realtime', channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS]), instrument_filter=('QUOTE', ('USD',)), sandbox='wss://stream-testnet.bybit.com/realtime', options={'compression': None}), - WebsocketEndpoint('wss://stream.bybit.com/realtime_public', channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS]), instrument_filter=('QUOTE', ('USDT',)), sandbox='wss://stream-testnet.bybit.com/realtime_public', options={'compression': None}), + WebsocketEndpoint('wss://stream.bybit.com/v5/public/linear', instrument_filter=('TYPE', (FUTURES, PERPETUAL)), channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS], websocket_channels[TICKER]), sandbox='wss://stream-testnet.bybit.com/v5/public/linear', options={'compression': None}), + WebsocketEndpoint('wss://stream.bybit.com/v5/public/spot', instrument_filter=('TYPE', (SPOT)), channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[CANDLES],), sandbox='wss://stream-testnet.bybit.com/v5/public/spot', options={'compression': None}), WebsocketEndpoint('wss://stream.bybit.com/realtime_private', channel_filter=(websocket_channels[ORDER_INFO], websocket_channels[FILLS]), instrument_filter=('QUOTE', ('USDT',)), sandbox='wss://stream-testnet.bybit.com/realtime_private', options={'compression': None}), ] - rest_endpoints = [RestEndpoint('https://api.bybit.com', routes=Routes('/v2/public/symbols'))] + rest_endpoints = [ + RestEndpoint('https://api.bybit.com', routes=Routes(['/v5/market/instruments-info?&category=linear&status=Trading&limit=1000', '/v5/market/instruments-info?&category=spot&status=Trading&limit=1000'])) + ] valid_candle_intervals = {'1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '1d', '1w', '1M'} candle_interval_map = {'1m': '1', '3m': '3', '5m': '5', '15m': '15', '30m': '30', '1h': '60', '2h': '120', '4h': '240', '6h': '360', '1d': 'D', '1w': 'W', '1M': 'M'} + # Bybit sends delta updates for futures, which might not include some values if they haven't changed. + # https://bybit-exchange.github.io/docs/v5/websocket/public/ticker + # Initialize the store to keep snapshots and update the data with deltas + tickers = {} + @classmethod def timestamp_normalize(cls, ts: Union[int, dt]) -> float: if isinstance(ts, int): @@ -53,28 +61,61 @@ def timestamp_normalize(cls, ts: Union[int, dt]) -> float: else: return ts.timestamp() + @staticmethod + def convert_to_spot_name(cls, pair): + # Bybit spot and USDT perps use the same symbol name. To distinguish them, use a slash to separate the base and quote. + if not re.findall(r"(USDT|USDC|EUR|BTC|ETH|DAI|BRZ)$", pair): + LOG.error("Quote currency not found in the trading pair %s", pair) + + return None + + return re.sub(r"(USDT|USDC|EUR|BTC|ETH|DAI|BRZ)$", r"/\1", pair) + @classmethod def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]: ret = {} info = defaultdict(dict) - # PERPETUAL & FUTURES - for symbol in data['result']: - base = symbol['base_currency'] - quote = symbol['quote_currency'] - - stype = PERPETUAL - expiry = None - if not symbol['name'].endswith(quote): - stype = FUTURES - year = symbol['name'].replace(base + quote, '')[-2:] - expiry = year + symbol['alias'].replace(base + quote, '')[-4:] - - s = Symbol(base, quote, type=stype, expiry_date=expiry) - - ret[s.normalized] = symbol['name'] - info['tick_size'][s.normalized] = symbol['price_filter']['tick_size'] - info['instrument_type'][s.normalized] = stype + for msg in data: + if isinstance(msg['result'], dict): + for symbol in msg['result']['list']: + + if 'contractType' not in symbol: + stype = SPOT + elif 'contractType' in symbol: + if symbol['contractType'] == 'LinearPerpetual': + stype = PERPETUAL + elif symbol['contractType'] == 'LinearFutures': + stype = FUTURES + + base = symbol['baseCoin'] + quote = symbol['quoteCoin'] + + expiry = None + + if stype is FUTURES: + if not symbol['symbol'].endswith(quote): + # linear futures + if '-' in symbol['symbol']: + expiry = symbol['symbol'].split('-')[-1] + + s = Symbol(base, quote, type=stype, expiry_date=expiry) + + # Bybit spot and USDT perps share the same symbol name, so + # here it is formed using the base and quote coins, separated + # by a slash. This is consistent with the UI. + # https://bybit-exchange.github.io/docs/v5/enum#symbol + if stype == SPOT: + ret[s.normalized] = f'{base}/{quote}' + elif stype == PERPETUAL and symbol['symbol'].endswith('PERP'): + ret[s.normalized] = symbol['symbol'] + elif stype == PERPETUAL: + ret[s.normalized] = f'{base}{quote}' + elif stype == FUTURES: + ret[s.normalized] = symbol['symbol'] + + info['tick_size'][s.normalized] = Decimal(symbol['priceFilter']['tickSize']) + info['instrument_type'][s.normalized] = stype return ret, info @@ -86,30 +127,40 @@ def __reset(self, conn: AsyncConnection): if std_pair in self._l2_book: del self._l2_book[std_pair] - self._instrument_info_cache = {} + self.tickers = {} - async def _candle(self, msg: dict, timestamp: float): - ''' + async def _candle(self, msg: dict, timestamp: float, market: str): + """ { - "topic": "klineV2.1.BTCUSD", //topic name - "data": [{ - "start": 1572425640, //start time of the candle - "end": 1572425700, //end time of the candle - "open": 9200, //open price - "close": 9202.5, //close price - "high": 9202.5, //max price - "low": 9196, //min price - "volume": 81790, //volume - "turnover": 8.889247899999999, //turnover - "confirm": False, //snapshot flag (indicates if candle is closed or not) - "cross_seq": 297503466, - "timestamp": 1572425676958323 //cross time - }], - "timestamp_e6": 1572425677047994 //server time + "topic": "kline.5.BTCPERP", + "data": [ + { + "start": 1671187800000, + "end": 1671188099999, + "interval": "5", + "open": "16991", + "close": "16980.5", + "high": "16991", + "low": "16980.5", + "volume": "2.501", + "turnover": "42493.2305", + "confirm": false, + "timestamp": 1671187815755 + } + ], + "ts": 1671187815755, + "type": "snapshot" } - ''' - symbol = self.exchange_symbol_to_std_symbol(msg['topic'].split(".")[-1]) - ts = msg['timestamp_e6'] / 1_000_000 + """ + symbol = msg['topic'].split(".")[-1] + if market == 'spot': + symbol = self.convert_to_spot_name(self, symbol) + if not symbol: + return + + symbol = self.exchange_symbol_to_std_symbol(symbol) + + ts = int(msg['ts']) for entry in msg['data']: if self.candle_closed_only and not entry['confirm']: @@ -133,13 +184,15 @@ async def _candle(self, msg: dict, timestamp: float): async def _liquidation(self, msg: dict, timestamp: float): ''' { - 'topic': 'liquidation.EOSUSDT', - 'data': { - 'symbol': 'EOSUSDT', - 'side': 'Buy', - 'price': '3.950', - 'qty': '58.0', - 'time': 1632586154956 + "topic": "liquidation.BTCUSDT", + "type": "snapshot", + "ts": 1703485237953, + "data": { + "updatedTime": 1703485237953, + "symbol": "BTCUSDT", + "side": "Sell", + "size": "0.003", + "price": "43511.70" } } ''' @@ -147,11 +200,11 @@ async def _liquidation(self, msg: dict, timestamp: float): self.id, self.exchange_symbol_to_std_symbol(msg['data']['symbol']), BUY if msg['data']['side'] == 'Buy' else SELL, - Decimal(msg['data']['qty']), + Decimal(msg['data']['size']), Decimal(msg['data']['price']), None, None, - self.timestamp_normalize(msg['data']['time']), + msg['ts'], raw=msg ) await self.callback(LIQUIDATIONS, liq, timestamp) @@ -160,30 +213,36 @@ async def message_handler(self, msg: str, conn, timestamp: float): msg = json.loads(msg, parse_float=Decimal) + # Bybit spot and USDT perps share the same symbol name, so to help to distinguish spot pairs from USDT perps, + # pick the market from the WebSocket address URL and pass it to the functions. + # 'linear' - futures, perpetual, 'spot' - spot + market = conn.address.split('/')[-1] if "success" in msg: if msg['success']: - if msg['request']['op'] == 'auth': - LOG.debug("%s: Authenticated successful", conn.uuid) - elif msg['request']['op'] == 'subscribe': - LOG.debug("%s: Subscribed to channels: %s", conn.uuid, msg['request']['args']) + if 'request' in msg: + if msg['request']['op'] == 'auth': + LOG.debug("%s: Authenticated successful", conn.uuid) + elif msg['op'] == 'subscribe': + # {"success": true, "ret_msg": "","op": "subscribe","conn_id": "cejreassvfrsfvb9v1a0-2m"} + LOG.debug("%s: Subscribed to channel.", conn.uuid) else: LOG.warning("%s: Unhandled 'successs' message received", conn.uuid) else: LOG.error("%s: Error from exchange %s", conn.uuid, msg) - elif msg["topic"].startswith('trade'): - await self._trade(msg, timestamp) - elif msg["topic"].startswith('orderBook'): - await self._book(msg, timestamp) + elif msg["topic"].startswith('publicTrade'): + await self._trade(msg, timestamp, market) + elif msg["topic"].startswith('orderbook'): + await self._book(msg, timestamp, market) + elif msg['topic'].startswith('kline'): + await self._candle(msg, timestamp, market) elif msg['topic'].startswith('liquidation'): await self._liquidation(msg, timestamp) - elif "instrument_info" in msg["topic"]: - await self._instrument_info(msg, timestamp) + elif msg['topic'].startswith('tickers'): + await self._ticker_open_interest_funding_index(msg, timestamp, conn) elif "order" in msg["topic"]: await self._order(msg, timestamp) elif "execution" in msg["topic"]: await self._execution(msg, timestamp) - elif 'klineV2' in msg['topic'] or 'candle' in msg['topic']: - await self._candle(msg, timestamp) # elif "position" in msg["topic"]: # await self._balances(msg, timestamp) else: @@ -191,18 +250,40 @@ async def message_handler(self, msg: str, conn, timestamp: float): async def subscribe(self, connection: AsyncConnection): self.__reset(connection) + + # Bybit does not offer separate channels for open interest, funding, and index price. + # Instead, it integrates this data into the 'tickers' channel. This approach de-duplicates pairs and + # subscribes them all at once to the 'tickers' channel. + tickers_pairs = [] + for chan, pairs in connection.subscription.items(): + if chan in [self.websocket_channels[TICKER], OPEN_INTEREST, FUNDING, INDEX]: + tickers_pairs += pairs + tickers_pairs = list(set(tickers_pairs)) + sub = [f"tickers.{pair}" for pair in tickers_pairs] + if sub: + await connection.write(json.dumps({"op": "subscribe", "args": sub})) + for chan in connection.subscription: if not self.is_authenticated_channel(self.exchange_channel_to_std(chan)): for pair in connection.subscription[chan]: sym = str_to_symbol(self.exchange_symbol_to_std_symbol(pair)) + if sym.type == SPOT: + pair = pair.replace('/', '') if self.exchange_channel_to_std(chan) == CANDLES: - c = chan if sym.quote == 'USD' else 'candle' - sub = [f"{c}.{self.candle_interval_map[self.candle_interval]}.{pair}"] + sub = [f"{self.websocket_channels[CANDLES]}.{self.candle_interval_map[self.candle_interval]}.{pair}"] + elif self.exchange_channel_to_std(chan) == L2_BOOK: + l2_book_channel = { + SPOT: "orderbook.200", + FUTURES: "orderbook.200", + PERPETUAL: "orderbook.200", + } + sub = [f"{l2_book_channel[sym.type]}.{pair}"] else: sub = [f"{chan}.{pair}"] - await connection.write(json.dumps({"op": "subscribe", "args": sub})) + if self.exchange_channel_to_std(chan) not in [self.websocket_channels[TICKER], OPEN_INTEREST, FUNDING, INDEX]: + await connection.write(json.dumps({"op": "subscribe", "args": sub})) else: await connection.write(json.dumps( { @@ -211,191 +292,208 @@ async def subscribe(self, connection: AsyncConnection): } )) - async def _instrument_info(self, msg: dict, timestamp: float): + async def _trade(self, msg: dict, timestamp: float, market: str): """ - ### Snapshot type update { - "topic": "instrument_info.100ms.BTCUSD", + "topic": "publicTrade.BTCUSDT", "type": "snapshot", - "data": { - "id": 1, - "symbol": "BTCUSD", //instrument name - "last_price_e4": 81165000, //the latest price - "last_tick_direction": "ZeroPlusTick", //the direction of last tick:PlusTick,ZeroPlusTick,MinusTick, - //ZeroMinusTick - "prev_price_24h_e4": 81585000, //the price of prev 24h - "price_24h_pcnt_e6": -5148, //the current last price percentage change from prev 24h price - "high_price_24h_e4": 82900000, //the highest price of prev 24h - "low_price_24h_e4": 79655000, //the lowest price of prev 24h - "prev_price_1h_e4": 81395000, //the price of prev 1h - "price_1h_pcnt_e6": -2825, //the current last price percentage change from prev 1h price - "mark_price_e4": 81178500, //mark price - "index_price_e4": 81172800, //index price - "open_interest": 154418471, //open interest quantity - Attention, the update is not - //immediate - slowest update is 1 minute - "open_value_e8": 1997561103030, //open value quantity - Attention, the update is not - //immediate - the slowest update is 1 minute - "total_turnover_e8": 2029370141961401, //total turnover - "turnover_24h_e8": 9072939873591, //24h turnover - "total_volume": 175654418740, //total volume - "volume_24h": 735865248, //24h volume - "funding_rate_e6": 100, //funding rate - "predicted_funding_rate_e6": 100, //predicted funding rate - "cross_seq": 1053192577, //sequence - "created_at": "2018-11-14T16:33:26Z", - "updated_at": "2020-01-12T18:25:16Z", - "next_funding_time": "2020-01-13T00:00:00Z", //next funding time - //the rest time to settle funding fee - "countdown_hour": 6 //the remaining time to settle the funding fee - }, - "cross_seq": 1053192634, - "timestamp_e6": 1578853524091081 //the timestamp when this information was produced - } + "ts": 1672304486868, + "data": [ + { + "T": 1672304486865, + "s": "BTCUSDT", + "S": "Buy", + "v": "0.001", + "p": "16578.50", + "L": "PlusTick", + "i": "20f43950-d8dd-5b31-9112-a178eb6023af", + "BT": false}]} + """ + data = msg['data'] + if isinstance(data, list): + for trade in data: + symbol = trade['s'] - ### Delta type update + if market == 'spot': + symbol = self.convert_to_spot_name(self, trade['s']) + if not symbol: + return + + ts = int(trade['T']) if isinstance(trade['T'], str) else trade['T'] + + t = Trade( + self.id, + self.exchange_symbol_to_std_symbol(symbol), + BUY if trade['S'] == 'Buy' else SELL, + Decimal(trade['v']), + Decimal(trade['p']), + self.timestamp_normalize(ts), + id=trade['i'], + raw=trade + ) + await self.callback(TRADES, t, timestamp) + + async def _book(self, msg: dict, timestamp: float, market: str): + ''' { - "topic": "instrument_info.100ms.BTCUSD", - "type": "delta", - "data": { - "delete": [], - "update": [ - { - "id": 1, - "symbol": "BTCUSD", - "prev_price_24h_e4": 81565000, - "price_24h_pcnt_e6": -4904, - "open_value_e8": 2000479681106, - "total_turnover_e8": 2029370495672976, - "turnover_24h_e8": 9066215468687, - "volume_24h": 735316391, - "cross_seq": 1053192657, - "created_at": "2018-11-14T16:33:26Z", - "updated_at": "2020-01-12T18:25:25Z" - } - ], - "insert": [] - }, - "cross_seq": 1053192657, - "timestamp_e6": 1578853525691123 + "topic": "orderbook.50.BTCUSDT", + "type": "snapshot", + "ts": 1672304484978, + "data": { + "s": "BTCUSDT", + "b": [ + ..., + [ + "16493.50", + "0.006" + ], + [ + "16493.00", + "0.100" + ] + ], + "a": [ + [ + "16611.00", + "0.029" + ], + [ + "16612.00", + "0.213" + ], + ..., + ], + "u": 18521288, + "seq": 7961638724 + } + "cts": 1672304484976 } - """ + ''' + pair = msg['topic'].split('.')[-1] update_type = msg['type'] + data = msg['data'] + delta = {BID: [], ASK: []} + + if market == 'spot': + pair = self.convert_to_spot_name(self, data['s']) + if not pair: + return + + pair = self.exchange_symbol_to_std_symbol(pair) if update_type == 'snapshot': - updates = [msg['data']] - else: - updates = msg['data']['update'] + delta = None + self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth) - for info in updates: - if msg['topic'] in self._instrument_info_cache and self._instrument_info_cache[msg['topic']] == updates: - continue - else: - self._instrument_info_cache[msg['topic']] = updates + for key, update in data.items(): + side = BID if key == 'b' else ASK + if key == 'a' or key == 'b': + for price, size in update: - ts = int(msg['timestamp_e6']) / 1_000_000 + price = Decimal(price) + size = Decimal(size) - if 'open_interest' in info: - oi = OpenInterest( - self.id, - self.exchange_symbol_to_std_symbol(info['symbol']), - Decimal(info['open_interest']), - ts, - raw=info - ) - await self.callback(OPEN_INTEREST, oi, timestamp) + if size == 0: + if price in self._l2_book[pair].book[side]: + del self._l2_book[pair].book[side][price] + else: + self._l2_book[pair].book[side][price] = size - if 'index_price_e4' in info: - i = Index( - self.id, - self.exchange_symbol_to_std_symbol(info['symbol']), - Decimal(info['index_price_e4']) * Decimal('1e-4'), - ts, - raw=info - ) - await self.callback(INDEX, i, timestamp) + if update_type == 'delta': + delta = {BID: data['b'], ASK: data['a']} - if 'funding_rate_e6' in info: - f = Funding( - self.id, - self.exchange_symbol_to_std_symbol(info['symbol']), - None, - Decimal(info['funding_rate_e6']) * Decimal('1e-6'), - info['next_funding_time'].timestamp() if 'next_funding_time' in info else None, - ts, - predicted_rate=Decimal(info['predicted_funding_rate_e6']) * Decimal('1e-6'), - raw=info - ) - await self.callback(FUNDING, f, timestamp) + await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=int(msg['ts']), raw=msg, delta=delta) - async def _trade(self, msg: dict, timestamp: float): - """ - {"topic":"trade.BTCUSD", - "data":[ - { - "timestamp":"2019-01-22T15:04:33.461Z", - "symbol":"BTCUSD", - "side":"Buy", - "size":980, - "price":3563.5, - "tick_direction":"PlusTick", - "trade_id":"9d229f26-09a8-42f8-aff3-0ba047b0449d", - "cross_seq":163261271}]} - """ - data = msg['data'] - for trade in data: - if isinstance(trade['trade_time_ms'], str): - ts = int(trade['trade_time_ms']) - else: - ts = trade['trade_time_ms'] + async def _ticker_open_interest_funding_index(self, msg: dict, timestamp: float, conn: AsyncConnection): + ''' + { + "topic": "tickers.BTCUSDT", + "type": "snapshot", + "data": { + "symbol": "BTCUSDT", + "tickDirection": "PlusTick", + "price24hPcnt": "0.017103", + "lastPrice": "17216.00", + "prevPrice24h": "16926.50", + "highPrice24h": "17281.50", + "lowPrice24h": "16915.00", + "prevPrice1h": "17238.00", + "markPrice": "17217.33", + "indexPrice": "17227.36", + "openInterest": "68744.761", + "openInterestValue": "1183601235.91", + "turnover24h": "1570383121.943499", + "volume24h": "91705.276", + "nextFundingTime": "1673280000000", + "fundingRate": "-0.000212", + "bid1Price": "17215.50", + "bid1Size": "84.489", + "ask1Price": "17216.00", + "ask1Size": "83.020" + }, + "cs": 24987956059, + "ts": 1673272861686 + } + ''' + + # Bybit does not provide bid/ask information for the spot market, only for perps at the moment + update_type = msg['type'] + update = msg['data'] + _pair = msg['data']['symbol'] + symbol = self.exchange_symbol_to_std_symbol(_pair) + + if update_type == 'snapshot': + self.tickers[symbol] = update + + if update_type == 'delta': + self.tickers[symbol].update(update) + update = self.tickers[symbol] - t = Trade( + if 'tickers' in conn.subscription and _pair in conn.subscription['tickers']: + t = Ticker( self.id, - self.exchange_symbol_to_std_symbol(trade['symbol']), - BUY if trade['side'] == 'Buy' else SELL, - Decimal(trade['size']), - Decimal(trade['price']), - self.timestamp_normalize(ts), - id=trade['trade_id'], - raw=trade + symbol, + Decimal(update['bid1Price']) if 'bid1Price' in update else Decimal(0), + Decimal(update['ask1Price']) if 'ask1Price' in update else Decimal(0), + int(msg['ts']), + raw=update ) - await self.callback(TRADES, t, timestamp) + await self.callback(TICKER, t, timestamp) - async def _book(self, msg: dict, timestamp: float): - pair = self.exchange_symbol_to_std_symbol(msg['topic'].split('.')[-1]) - update_type = msg['type'] - data = msg['data'] - delta = {BID: [], ASK: []} + if 'funding' in conn.subscription and _pair in conn.subscription['funding']: + f = Funding( + self.id, + symbol, + Decimal(update['markPrice']), + Decimal(update['fundingRate']), + int(update['nextFundingTime']), + int(msg['ts']), + None, + raw=update + ) + await self.callback(FUNDING, f, timestamp) - if update_type == 'snapshot': - delta = None - self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth) - # the USDT perpetual data is under the order_book key - if 'order_book' in data: - data = data['order_book'] + if 'open_interest' in conn.subscription and _pair in conn.subscription['open_interest']: + o = OpenInterest( + self.id, + symbol, + Decimal(update['openInterest']), + int(msg['ts']), + raw=update + ) - for update in data: - side = BID if update['side'] == 'Buy' else ASK - self._l2_book[pair].book[side][Decimal(update['price'])] = Decimal(update['size']) - else: - for delete in data['delete']: - side = BID if delete['side'] == 'Buy' else ASK - price = Decimal(delete['price']) - delta[side].append((price, 0)) - del self._l2_book[pair].book[side][price] - - for utype in ('update', 'insert'): - for update in data[utype]: - side = BID if update['side'] == 'Buy' else ASK - price = Decimal(update['price']) - amount = Decimal(update['size']) - delta[side].append((price, amount)) - self._l2_book[pair].book[side][price] = amount - - # timestamp is in microseconds - ts = msg['timestamp_e6'] - if isinstance(ts, str): - ts = int(ts) - await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts / 1000000, raw=msg, delta=delta) + await self.callback(OPEN_INTEREST, o, timestamp) + + if 'index' in conn.subscription and _pair in conn.subscription['index']: + i = Index( + self.id, + symbol, + Decimal(update['indexPrice']), + int(msg['ts']), + raw=update + ) + + await self.callback(INDEX, i, timestamp) async def _order(self, msg: dict, timestamp: float): """