diff --git a/README.md b/README.md index f668e3d96f..d3b4b0a8dc 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ We created hummingbot to promote **decentralized market-making**: enabling membe | Bitfinex | bitfinex | [Bitfinex](https://www.bitfinex.com/) | 2 | [API](https://docs.bitfinex.com/docs/introduction) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) | | Blocktane | blocktane | [Blocktane](https://blocktane.io/) | 2 | [API](https://blocktane.io/api) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | | Coinbase Pro | coinbase_pro | [Coinbase Pro](https://pro.coinbase.com/) | * | [API](https://docs.pro.coinbase.com/) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | +| CoinZoom | coinzoom | [CoinZoom](https://trade.coinzoom.com/landing) | * | [API](https://api-docs.coinzoom.com/) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) | | Crypto.com | crypto_com | [Crypto.com](https://crypto.com/exchange) | 2 | [API](https://exchange-docs.crypto.com/#introduction) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) | | DyDx | dydx | [dy/dx](https://dydx.exchange/) | 1 | [API](https://docs.dydx.exchange/) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | | Eterbase | eterbase | [Eterbase](https://www.eterbase.com/) | * | [API](https://developers.eterbase.exchange/?version=latest) |![RED](https://via.placeholder.com/15/f03c15/?text=+) | diff --git a/assets/coinzoom_logo.svg b/assets/coinzoom_logo.svg new file mode 100644 index 0000000000..8184f907e7 --- /dev/null +++ b/assets/coinzoom_logo.svg @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/conf/__init__.py b/conf/__init__.py index 7854c51a99..69e87838fa 100644 --- a/conf/__init__.py +++ b/conf/__init__.py @@ -108,6 +108,11 @@ hitbtc_api_key = os.getenv("HITBTC_API_KEY") hitbtc_secret_key = os.getenv("HITBTC_SECRET_KEY") +# CoinZoom Test +coinzoom_api_key = os.getenv("COINZOOM_API_KEY") +coinzoom_secret_key = os.getenv("COINZOOM_SECRET_KEY") +coinzoom_username = os.getenv("COINZOOM_USERNAME") + # Wallet Tests test_erc20_token_address = os.getenv("TEST_ERC20_TOKEN_ADDRESS") web3_test_private_key_a = os.getenv("TEST_WALLET_PRIVATE_KEY_A") diff --git a/hummingbot/connector/connector_status.py b/hummingbot/connector/connector_status.py index 14a0968094..abe0f7dd2b 100644 --- a/hummingbot/connector/connector_status.py +++ b/hummingbot/connector/connector_status.py @@ -13,6 +13,7 @@ 'blocktane': 'green', 'celo': 'green', 'coinbase_pro': 'green', + 'coinzoom': 'yellow', 'crypto_com': 'yellow', 'dydx': 'green', 'eterbase': 'red', diff --git a/hummingbot/connector/exchange/coinzoom/__init__.py b/hummingbot/connector/exchange/coinzoom/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pxd b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pxd new file mode 100644 index 0000000000..881d7862df --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pxd @@ -0,0 +1,13 @@ +# distutils: language=c++ +cimport numpy as np + +cdef class CoinzoomActiveOrderTracker: + cdef dict _active_bids + cdef dict _active_asks + cdef dict _active_asks_ids + cdef dict _active_bids_ids + + cdef tuple c_convert_diff_message_to_np_arrays(self, object message) + cdef tuple c_convert_snapshot_message_to_np_arrays(self, object message) + # This method doesn't seem to be used anywhere at all + # cdef np.ndarray[np.float64_t, ndim=1] c_convert_trade_message_to_np_array(self, object message) diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pyx b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pyx new file mode 100644 index 0000000000..a7e4fcb815 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pyx @@ -0,0 +1,157 @@ +# distutils: language=c++ +# distutils: sources=hummingbot/core/cpp/OrderBookEntry.cpp +import logging +import numpy as np +from decimal import Decimal +from typing import Dict +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.order_book_row import OrderBookRow + +_logger = None +s_empty_diff = np.ndarray(shape=(0, 4), dtype="float64") +CoinzoomOrderBookTrackingDictionary = Dict[Decimal, Dict[str, Dict[str, any]]] + +cdef class CoinzoomActiveOrderTracker: + def __init__(self, + active_asks: CoinzoomOrderBookTrackingDictionary = None, + active_bids: CoinzoomOrderBookTrackingDictionary = None): + super().__init__() + self._active_asks = active_asks or {} + self._active_bids = active_bids or {} + self._active_asks_ids = {} + self._active_bids_ids = {} + + @classmethod + def logger(cls) -> HummingbotLogger: + global _logger + if _logger is None: + _logger = logging.getLogger(__name__) + return _logger + + @property + def active_asks(self) -> CoinzoomOrderBookTrackingDictionary: + return self._active_asks + + @property + def active_bids(self) -> CoinzoomOrderBookTrackingDictionary: + return self._active_bids + + # TODO: research this more + def volume_for_ask_price(self, price) -> float: + return NotImplementedError + + # TODO: research this more + def volume_for_bid_price(self, price) -> float: + return NotImplementedError + + def get_rates_and_quantities(self, entry) -> tuple: + # price, quantity + return float(entry[0]), float(entry[1]) + + def get_rates_and_amts_with_ids(self, entry, id_list) -> tuple: + if len(entry) > 1: + price = float(entry[1]) + amount = float(entry[2]) + id_list[str(entry[0])] = price + else: + price = id_list.get(str(entry[0])) + amount = 0.0 + return price, amount + + cdef tuple c_convert_diff_message_to_np_arrays(self, object message): + cdef: + dict content = message.content + list content_keys = list(content.keys()) + list bid_entries = [] + list ask_entries = [] + str order_id + str order_side + str price_raw + object price + dict order_dict + double timestamp = message.timestamp + double amount = 0 + dict nps = {'bids': s_empty_diff, 'asks': s_empty_diff} + + if "b" in content_keys: + bid_entries = content["b"] + if "s" in content_keys: + ask_entries = content["s"] + + for entries, diff_key, id_list in [ + (bid_entries, 'bids', self._active_bids_ids), + (ask_entries, 'asks', self._active_asks_ids) + ]: + if len(entries) > 0: + nps[diff_key] = np.array( + [[timestamp, price, amount, message.update_id] + for price, amount in [self.get_rates_and_amts_with_ids(entry, id_list) for entry in entries] + if price is not None], + dtype="float64", ndmin=2 + ) + return nps['bids'], nps['asks'] + + cdef tuple c_convert_snapshot_message_to_np_arrays(self, object message): + cdef: + float price + float amount + str order_id + dict order_dict + + # Refresh all order tracking. + self._active_bids.clear() + self._active_asks.clear() + timestamp = message.timestamp + content = message.content + content_keys = list(content.keys()) + + if "bids" in content_keys: + for snapshot_orders, active_orders in [(content["bids"], self._active_bids), (content["asks"], self._active_asks)]: + for entry in snapshot_orders: + price, amount = self.get_rates_and_quantities(entry) + active_orders[price] = amount + else: + for snapshot_orders, active_orders, active_order_ids in [ + (content["b"], self._active_bids, self._active_bids_ids), + (content["s"], self._active_asks, self._active_asks_ids) + ]: + for entry in snapshot_orders: + price, amount = self.get_rates_and_amts_with_ids(entry, active_order_ids) + active_orders[price] = amount + + # Return the sorted snapshot tables. + cdef: + np.ndarray[np.float64_t, ndim=2] bids = np.array( + [[message.timestamp, float(price), float(self._active_bids[price]), message.update_id] + for price in sorted(self._active_bids.keys())], dtype='float64', ndmin=2) + np.ndarray[np.float64_t, ndim=2] asks = np.array( + [[message.timestamp, float(price), float(self._active_asks[price]), message.update_id] + for price in sorted(self._active_asks.keys(), reverse=True)], dtype='float64', ndmin=2) + + if bids.shape[1] != 4: + bids = bids.reshape((0, 4)) + if asks.shape[1] != 4: + asks = asks.reshape((0, 4)) + + return bids, asks + + # This method doesn't seem to be used anywhere at all + # cdef np.ndarray[np.float64_t, ndim=1] c_convert_trade_message_to_np_array(self, object message): + # cdef: + # double trade_type_value = 1.0 if message.content[4] == "BUY" else 2.0 + # list content = message.content + + # return np.array([message.timestamp, trade_type_value, float(content[1]), float(content[2])], + # dtype="float64") + + def convert_diff_message_to_order_book_row(self, message): + np_bids, np_asks = self.c_convert_diff_message_to_np_arrays(message) + bids_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_bids] + asks_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_asks] + return bids_row, asks_row + + def convert_snapshot_message_to_order_book_row(self, message): + np_bids, np_asks = self.c_convert_snapshot_message_to_np_arrays(message) + bids_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_bids] + asks_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_asks] + return bids_row, asks_row diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_api_order_book_data_source.py b/hummingbot/connector/exchange/coinzoom/coinzoom_api_order_book_data_source.py new file mode 100644 index 0000000000..49032d6f7f --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_api_order_book_data_source.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +import asyncio +import logging +import time +import pandas as pd +from decimal import Decimal +from typing import Optional, List, Dict, Any +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import OrderBookMessage +from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource +from hummingbot.logger import HummingbotLogger +from .coinzoom_constants import Constants +from .coinzoom_active_order_tracker import CoinzoomActiveOrderTracker +from .coinzoom_order_book import CoinzoomOrderBook +from .coinzoom_websocket import CoinzoomWebsocket +from .coinzoom_utils import ( + convert_to_exchange_trading_pair, + convert_from_exchange_trading_pair, + api_call_with_retries, + CoinzoomAPIError, +) + + +class CoinzoomAPIOrderBookDataSource(OrderBookTrackerDataSource): + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, trading_pairs: List[str] = None): + super().__init__(trading_pairs) + self._trading_pairs: List[str] = trading_pairs + self._snapshot_msg: Dict[str, any] = {} + + @classmethod + async def get_last_traded_prices(cls, trading_pairs: List[str]) -> Dict[str, Decimal]: + results = {} + tickers: List[Dict[Any]] = await api_call_with_retries("GET", Constants.ENDPOINT["TICKER"]) + for trading_pair in trading_pairs: + ex_pair: str = convert_to_exchange_trading_pair(trading_pair, True) + ticker: Dict[Any] = list([tic for symbol, tic in tickers.items() if symbol == ex_pair])[0] + results[trading_pair]: Decimal = Decimal(str(ticker["last_price"])) + return results + + @staticmethod + async def fetch_trading_pairs() -> List[str]: + try: + symbols: List[Dict[str, Any]] = await api_call_with_retries("GET", Constants.ENDPOINT["SYMBOL"]) + trading_pairs: List[str] = list([convert_from_exchange_trading_pair(sym["symbol"]) for sym in symbols]) + # Filter out unmatched pairs so nothing breaks + return [sym for sym in trading_pairs if sym is not None] + except Exception: + # Do nothing if the request fails -- there will be no autocomplete for CoinZoom trading pairs + pass + return [] + + @staticmethod + async def get_order_book_data(trading_pair: str) -> Dict[str, any]: + """ + Get whole orderbook + """ + try: + ex_pair = convert_to_exchange_trading_pair(trading_pair, True) + ob_endpoint = Constants.ENDPOINT["ORDER_BOOK"].format(trading_pair=ex_pair) + orderbook_response: Dict[Any] = await api_call_with_retries("GET", ob_endpoint) + return orderbook_response + except CoinzoomAPIError as e: + err = e.error_payload.get('error', e.error_payload) + raise IOError( + f"Error fetching OrderBook for {trading_pair} at {Constants.EXCHANGE_NAME}. " + f"HTTP status is {e.error_payload['status']}. Error is {err.get('message', str(err))}.") + + async def get_new_order_book(self, trading_pair: str) -> OrderBook: + snapshot: Dict[str, Any] = await self.get_order_book_data(trading_pair) + snapshot_timestamp: float = float(snapshot['timestamp']) + snapshot_msg: OrderBookMessage = CoinzoomOrderBook.snapshot_message_from_exchange( + snapshot, + snapshot_timestamp, + metadata={"trading_pair": trading_pair}) + order_book = self.order_book_create_function() + active_order_tracker: CoinzoomActiveOrderTracker = CoinzoomActiveOrderTracker() + bids, asks = active_order_tracker.convert_snapshot_message_to_order_book_row(snapshot_msg) + order_book.apply_snapshot(bids, asks, snapshot_msg.update_id) + return order_book + + async def listen_for_trades(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): + """ + Listen for trades using websocket trade channel + """ + while True: + try: + ws = CoinzoomWebsocket() + await ws.connect() + + for pair in self._trading_pairs: + await ws.subscribe({Constants.WS_SUB["TRADES"]: {'symbol': convert_to_exchange_trading_pair(pair)}}) + + async for response in ws.on_message(): + msg_keys = list(response.keys()) if response is not None else [] + + if not Constants.WS_METHODS["TRADES_UPDATE"] in msg_keys: + continue + + trade: List[Any] = response[Constants.WS_METHODS["TRADES_UPDATE"]] + trade_msg: OrderBookMessage = CoinzoomOrderBook.trade_message_from_exchange(trade) + output.put_nowait(trade_msg) + + except asyncio.CancelledError: + raise + except Exception: + self.logger().error("Unexpected error.", exc_info=True) + raise + await asyncio.sleep(5.0) + finally: + await ws.disconnect() + + async def listen_for_order_book_diffs(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): + """ + Listen for orderbook diffs using websocket book channel + """ + while True: + try: + ws = CoinzoomWebsocket() + await ws.connect() + + order_book_methods = [ + Constants.WS_METHODS['ORDERS_SNAPSHOT'], + Constants.WS_METHODS['ORDERS_UPDATE'], + ] + + for pair in self._trading_pairs: + ex_pair = convert_to_exchange_trading_pair(pair) + ws_stream = { + Constants.WS_SUB["ORDERS"]: { + 'requestId': ex_pair, + 'symbol': ex_pair, + 'aggregate': False, + 'depth': 0, + } + } + await ws.subscribe(ws_stream) + + async for response in ws.on_message(): + msg_keys = list(response.keys()) if response is not None else [] + + method_key = [key for key in msg_keys if key in order_book_methods] + + if len(method_key) != 1: + continue + + method: str = method_key[0] + order_book_data: dict = response + timestamp: int = int(time.time() * 1e3) + pair: str = convert_from_exchange_trading_pair(response[method]) + + order_book_msg_cls = (CoinzoomOrderBook.diff_message_from_exchange + if method == Constants.WS_METHODS['ORDERS_UPDATE'] else + CoinzoomOrderBook.snapshot_message_from_exchange) + + orderbook_msg: OrderBookMessage = order_book_msg_cls( + order_book_data, + timestamp, + metadata={"trading_pair": pair}) + output.put_nowait(orderbook_msg) + + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unexpected error with WebSocket connection.", exc_info=True, + app_warning_msg="Unexpected error with WebSocket connection. Retrying in 30 seconds. " + "Check network connection.") + await asyncio.sleep(30.0) + finally: + await ws.disconnect() + + async def listen_for_order_book_snapshots(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): + """ + Listen for orderbook snapshots by fetching orderbook + """ + while True: + try: + for trading_pair in self._trading_pairs: + try: + snapshot: Dict[str, any] = await self.get_order_book_data(trading_pair) + snapshot_msg: OrderBookMessage = CoinzoomOrderBook.snapshot_message_from_exchange( + snapshot, + snapshot['timestamp'], + metadata={"trading_pair": trading_pair} + ) + output.put_nowait(snapshot_msg) + self.logger().debug(f"Saved order book snapshot for {trading_pair}") + # Be careful not to go above API rate limits. + await asyncio.sleep(5.0) + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unexpected error with WebSocket connection.", exc_info=True, + app_warning_msg="Unexpected error with WebSocket connection. Retrying in 5 seconds. " + "Check network connection.") + await asyncio.sleep(5.0) + this_hour: pd.Timestamp = pd.Timestamp.utcnow().replace(minute=0, second=0, microsecond=0) + next_hour: pd.Timestamp = this_hour + pd.Timedelta(hours=1) + delta: float = next_hour.timestamp() - time.time() + await asyncio.sleep(delta) + except asyncio.CancelledError: + raise + except Exception: + self.logger().error("Unexpected error.", exc_info=True) + await asyncio.sleep(5.0) diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_api_user_stream_data_source.py b/hummingbot/connector/exchange/coinzoom/coinzoom_api_user_stream_data_source.py new file mode 100755 index 0000000000..7aede77e89 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_api_user_stream_data_source.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +import time +import asyncio +import logging +from typing import ( + Any, + AsyncIterable, + List, + Optional, +) +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.logger import HummingbotLogger +from .coinzoom_constants import Constants +from .coinzoom_auth import CoinzoomAuth +from .coinzoom_utils import CoinzoomAPIError +from .coinzoom_websocket import CoinzoomWebsocket + + +class CoinzoomAPIUserStreamDataSource(UserStreamTrackerDataSource): + + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, coinzoom_auth: CoinzoomAuth, trading_pairs: Optional[List[str]] = []): + self._coinzoom_auth: CoinzoomAuth = coinzoom_auth + self._ws: CoinzoomWebsocket = None + self._trading_pairs = trading_pairs + self._current_listen_key = None + self._listen_for_user_stream_task = None + self._last_recv_time: float = 0 + super().__init__() + + @property + def last_recv_time(self) -> float: + return self._last_recv_time + + async def _ws_request_balances(self): + return await self._ws.request(Constants.WS_METHODS["USER_BALANCE"]) + + async def _listen_to_orders_trades_balances(self) -> AsyncIterable[Any]: + """ + Subscribe to active orders via web socket + """ + + try: + self._ws = CoinzoomWebsocket(self._coinzoom_auth) + + await self._ws.connect() + + await self._ws.subscribe({Constants.WS_SUB["USER_ORDERS_TRADES"]: {}}) + + event_methods = [ + Constants.WS_METHODS["USER_ORDERS"], + # We don't need to know about pending cancels + # Constants.WS_METHODS["USER_ORDERS_CANCEL"], + ] + + async for msg in self._ws.on_message(): + self._last_recv_time = time.time() + + msg_keys = list(msg.keys()) if msg is not None else [] + + if not any(ws_method in msg_keys for ws_method in event_methods): + continue + yield msg + except Exception as e: + raise e + finally: + await self._ws.disconnect() + await asyncio.sleep(5) + + async def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue) -> AsyncIterable[Any]: + """ + *required + Subscribe to user stream via web socket, and keep the connection open for incoming messages + :param ev_loop: ev_loop to execute this function in + :param output: an async queue where the incoming messages are stored + """ + + while True: + try: + async for msg in self._listen_to_orders_trades_balances(): + output.put_nowait(msg) + except asyncio.CancelledError: + raise + except CoinzoomAPIError as e: + self.logger().error(e.error_payload.get('error'), exc_info=True) + raise + except Exception: + self.logger().error( + f"Unexpected error with {Constants.EXCHANGE_NAME} WebSocket connection. " + "Retrying after 30 seconds...", exc_info=True) + await asyncio.sleep(30.0) diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_auth.py b/hummingbot/connector/exchange/coinzoom/coinzoom_auth.py new file mode 100755 index 0000000000..9379f3716b --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_auth.py @@ -0,0 +1,31 @@ +from typing import Dict, Any + + +class CoinzoomAuth(): + """ + Auth class required by CoinZoom API + Learn more at https://exchange-docs.crypto.com/#digital-signature + """ + def __init__(self, api_key: str, secret_key: str, username: str): + self.api_key = api_key + self.secret_key = secret_key + self.username = username + + def get_ws_params(self) -> Dict[str, str]: + return { + "apiKey": str(self.api_key), + "secretKey": str(self.secret_key), + } + + def get_headers(self) -> Dict[str, Any]: + """ + Generates authentication headers required by CoinZoom + :return: a dictionary of auth headers + """ + headers = { + "Content-Type": "application/json", + "Coinzoom-Api-Key": str(self.api_key), + "Coinzoom-Api-Secret": str(self.secret_key), + "User-Agent": f"hummingbot ZoomMe: {self.username}" + } + return headers diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_constants.py b/hummingbot/connector/exchange/coinzoom/coinzoom_constants.py new file mode 100644 index 0000000000..0cad1cb049 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_constants.py @@ -0,0 +1,60 @@ +# A single source of truth for constant variables related to the exchange +class Constants: + """ + API Documentation Links: + https://api-docs.coinzoom.com/ + https://api-markets.coinzoom.com/ + """ + EXCHANGE_NAME = "coinzoom" + REST_URL = "https://api.coinzoom.com/api/v1/public" + # REST_URL = "https://api.stage.coinzoom.com/api/v1/public" + WS_PRIVATE_URL = "wss://api.coinzoom.com/api/v1/public/market/data/stream" + # WS_PRIVATE_URL = "wss://api.stage.coinzoom.com/api/v1/public/market/data/stream" + WS_PUBLIC_URL = "wss://api.coinzoom.com/api/v1/public/market/data/stream" + # WS_PUBLIC_URL = "wss://api.stage.coinzoom.com/api/v1/public/market/data/stream" + + HBOT_BROKER_ID = "CZ_API_HBOT" + + ENDPOINT = { + # Public Endpoints + "TICKER": "marketwatch/ticker", + "SYMBOL": "instruments", + "ORDER_BOOK": "marketwatch/orderbook/{trading_pair}/150/2", + "ORDER_CREATE": "orders/new", + "ORDER_DELETE": "orders/cancel", + "ORDER_STATUS": "orders/list", + "USER_ORDERS": "orders/list", + "USER_BALANCES": "ledger/list", + } + + WS_SUB = { + "TRADES": "TradeSummaryRequest", + "ORDERS": "OrderBookRequest", + "USER_ORDERS_TRADES": "OrderUpdateRequest", + + } + + WS_METHODS = { + "ORDERS_SNAPSHOT": "ob", + "ORDERS_UPDATE": "oi", + "TRADES_UPDATE": "ts", + "USER_BALANCE": "getTradingBalance", + "USER_ORDERS": "OrderResponse", + "USER_ORDERS_CANCEL": "OrderCancelResponse", + } + + # Timeouts + MESSAGE_TIMEOUT = 30.0 + PING_TIMEOUT = 10.0 + API_CALL_TIMEOUT = 10.0 + API_MAX_RETRIES = 4 + + # Intervals + # Only used when nothing is received from WS + SHORT_POLL_INTERVAL = 5.0 + # One minute should be fine since we request balance updates on order updates + LONG_POLL_INTERVAL = 60.0 + # One minute should be fine for order status since we get these via WS + UPDATE_ORDER_STATUS_INTERVAL = 60.0 + # 10 minute interval to update trading rules, these would likely never change whilst running. + INTERVAL_TRADING_RULES = 600 diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_exchange.py b/hummingbot/connector/exchange/coinzoom/coinzoom_exchange.py new file mode 100644 index 0000000000..65108d7475 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_exchange.py @@ -0,0 +1,919 @@ +import logging +from typing import ( + Dict, + List, + Optional, + Any, + AsyncIterable, +) +from decimal import Decimal +import asyncio +import aiohttp +import math +import time +import ujson +from async_timeout import timeout + +from hummingbot.core.network_iterator import NetworkStatus +from hummingbot.logger import HummingbotLogger +from hummingbot.core.clock import Clock +from hummingbot.core.utils.asyncio_throttle import Throttler +from hummingbot.core.utils.async_utils import safe_ensure_future, safe_gather +from hummingbot.connector.trading_rule import TradingRule +from hummingbot.core.data_type.cancellation_result import CancellationResult +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.limit_order import LimitOrder +from hummingbot.core.event.events import ( + MarketEvent, + BuyOrderCompletedEvent, + SellOrderCompletedEvent, + OrderFilledEvent, + OrderCancelledEvent, + BuyOrderCreatedEvent, + SellOrderCreatedEvent, + MarketOrderFailureEvent, + OrderType, + TradeType, + TradeFee +) +from hummingbot.connector.exchange_base import ExchangeBase +from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_tracker import CoinzoomOrderBookTracker +from hummingbot.connector.exchange.coinzoom.coinzoom_user_stream_tracker import CoinzoomUserStreamTracker +from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth +from hummingbot.connector.exchange.coinzoom.coinzoom_in_flight_order import CoinzoomInFlightOrder +from hummingbot.connector.exchange.coinzoom.coinzoom_utils import ( + convert_from_exchange_trading_pair, + convert_to_exchange_trading_pair, + get_new_client_order_id, + aiohttp_response_with_errors, + retry_sleep_time, + str_date_to_ts, + CoinzoomAPIError, +) +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants +from hummingbot.core.data_type.common import OpenOrder +ctce_logger = None +s_decimal_NaN = Decimal("nan") + + +class CoinzoomExchange(ExchangeBase): + """ + CoinzoomExchange connects with CoinZoom exchange and provides order book pricing, user account tracking and + trading functionality. + """ + ORDER_NOT_EXIST_CONFIRMATION_COUNT = 3 + ORDER_NOT_EXIST_CANCEL_COUNT = 2 + + @classmethod + def logger(cls) -> HummingbotLogger: + global ctce_logger + if ctce_logger is None: + ctce_logger = logging.getLogger(__name__) + return ctce_logger + + def __init__(self, + coinzoom_api_key: str, + coinzoom_secret_key: str, + coinzoom_username: str, + trading_pairs: Optional[List[str]] = None, + trading_required: bool = True + ): + """ + :param coinzoom_api_key: The API key to connect to private CoinZoom APIs. + :param coinzoom_secret_key: The API secret. + :param coinzoom_username: The ZoomMe Username. + :param trading_pairs: The market trading pairs which to track order book data. + :param trading_required: Whether actual trading is needed. + """ + super().__init__() + self._trading_required = trading_required + self._trading_pairs = trading_pairs + self._coinzoom_auth = CoinzoomAuth(coinzoom_api_key, coinzoom_secret_key, coinzoom_username) + self._order_book_tracker = CoinzoomOrderBookTracker(trading_pairs=trading_pairs) + self._user_stream_tracker = CoinzoomUserStreamTracker(self._coinzoom_auth, trading_pairs) + self._ev_loop = asyncio.get_event_loop() + self._shared_client = None + self._poll_notifier = asyncio.Event() + self._last_timestamp = 0 + self._in_flight_orders = {} # Dict[client_order_id:str, CoinzoomInFlightOrder] + self._order_not_found_records = {} # Dict[client_order_id:str, count:int] + self._trading_rules = {} # Dict[trading_pair:str, TradingRule] + self._status_polling_task = None + self._user_stream_event_listener_task = None + self._trading_rules_polling_task = None + self._last_poll_timestamp = 0 + self._throttler = Throttler(rate_limit = (8.0, 6)) + + @property + def name(self) -> str: + return "coinzoom" + + @property + def order_books(self) -> Dict[str, OrderBook]: + return self._order_book_tracker.order_books + + @property + def trading_rules(self) -> Dict[str, TradingRule]: + return self._trading_rules + + @property + def in_flight_orders(self) -> Dict[str, CoinzoomInFlightOrder]: + return self._in_flight_orders + + @property + def status_dict(self) -> Dict[str, bool]: + """ + A dictionary of statuses of various connector's components. + """ + return { + "order_books_initialized": self._order_book_tracker.ready, + "account_balance": len(self._account_balances) > 0 if self._trading_required else True, + "trading_rule_initialized": len(self._trading_rules) > 0, + "user_stream_initialized": + self._user_stream_tracker.data_source.last_recv_time > 0 if self._trading_required else True, + } + + @property + def ready(self) -> bool: + """ + :return True when all statuses pass, this might take 5-10 seconds for all the connector's components and + services to be ready. + """ + return all(self.status_dict.values()) + + @property + def limit_orders(self) -> List[LimitOrder]: + return [ + in_flight_order.to_limit_order() + for in_flight_order in self._in_flight_orders.values() + ] + + @property + def tracking_states(self) -> Dict[str, any]: + """ + :return active in-flight orders in json format, is used to save in sqlite db. + """ + return { + key: value.to_json() + for key, value in self._in_flight_orders.items() + if not value.is_done + } + + def restore_tracking_states(self, saved_states: Dict[str, any]): + """ + Restore in-flight orders from saved tracking states, this is st the connector can pick up on where it left off + when it disconnects. + :param saved_states: The saved tracking_states. + """ + self._in_flight_orders.update({ + key: CoinzoomInFlightOrder.from_json(value) + for key, value in saved_states.items() + }) + + def supported_order_types(self) -> List[OrderType]: + """ + :return a list of OrderType supported by this connector. + Note that Market order type is no longer required and will not be used. + """ + return [OrderType.LIMIT, OrderType.LIMIT_MAKER] + + def start(self, clock: Clock, timestamp: float): + """ + This function is called automatically by the clock. + """ + super().start(clock, timestamp) + + def stop(self, clock: Clock): + """ + This function is called automatically by the clock. + """ + super().stop(clock) + + async def start_network(self): + """ + This function is required by NetworkIterator base class and is called automatically. + It starts tracking order book, polling trading rules, + updating statuses and tracking user data. + """ + self._order_book_tracker.start() + self._trading_rules_polling_task = safe_ensure_future(self._trading_rules_polling_loop()) + if self._trading_required: + self._status_polling_task = safe_ensure_future(self._status_polling_loop()) + self._user_stream_tracker_task = safe_ensure_future(self._user_stream_tracker.start()) + self._user_stream_event_listener_task = safe_ensure_future(self._user_stream_event_listener()) + + async def stop_network(self): + """ + This function is required by NetworkIterator base class and is called automatically. + """ + self._order_book_tracker.stop() + if self._status_polling_task is not None: + self._status_polling_task.cancel() + self._status_polling_task = None + if self._trading_rules_polling_task is not None: + self._trading_rules_polling_task.cancel() + self._trading_rules_polling_task = None + if self._status_polling_task is not None: + self._status_polling_task.cancel() + self._status_polling_task = None + if self._user_stream_tracker_task is not None: + self._user_stream_tracker_task.cancel() + self._user_stream_tracker_task = None + if self._user_stream_event_listener_task is not None: + self._user_stream_event_listener_task.cancel() + self._user_stream_event_listener_task = None + + async def check_network(self) -> NetworkStatus: + """ + This function is required by NetworkIterator base class and is called periodically to check + the network connection. Simply ping the network (or call any light weight public API). + """ + try: + # since there is no ping endpoint, the lowest rate call is to get BTC-USD symbol + await self._api_request("GET", Constants.ENDPOINT['SYMBOL']) + except asyncio.CancelledError: + raise + except Exception: + return NetworkStatus.NOT_CONNECTED + return NetworkStatus.CONNECTED + + async def _http_client(self) -> aiohttp.ClientSession: + """ + :returns Shared client session instance + """ + if self._shared_client is None: + self._shared_client = aiohttp.ClientSession() + return self._shared_client + + async def _trading_rules_polling_loop(self): + """ + Periodically update trading rule. + """ + while True: + try: + await self._update_trading_rules() + await asyncio.sleep(Constants.INTERVAL_TRADING_RULES) + except asyncio.CancelledError: + raise + except Exception as e: + self.logger().network(f"Unexpected error while fetching trading rules. Error: {str(e)}", + exc_info=True, + app_warning_msg=("Could not fetch new trading rules from " + f"{Constants.EXCHANGE_NAME}. Check network connection.")) + await asyncio.sleep(0.5) + + async def _update_trading_rules(self): + symbols_info = await self._api_request("GET", endpoint=Constants.ENDPOINT['SYMBOL']) + self._trading_rules.clear() + self._trading_rules = self._format_trading_rules(symbols_info) + + def _format_trading_rules(self, symbols_info: Dict[str, Any]) -> Dict[str, TradingRule]: + """ + Converts json API response into a dictionary of trading rules. + :param symbols_info: The json API response + :return A dictionary of trading rules. + Response Example: + [ + { + "symbol" : "BTC/USD", + "baseCurrencyCode" : "BTC", + "termCurrencyCode" : "USD", + "minTradeAmt" : 0.0001, + "maxTradeAmt" : 10, + "maxPricePrecision" : 2, + "maxQuantityPrecision" : 6, + "issueOnly" : false + } + ] + """ + result = {} + for rule in symbols_info: + try: + trading_pair = convert_from_exchange_trading_pair(rule["symbol"]) + min_amount = Decimal(str(rule["minTradeAmt"])) + min_price = Decimal(f"1e-{rule['maxPricePrecision']}") + result[trading_pair] = TradingRule(trading_pair, + min_order_size=min_amount, + max_order_size=Decimal(str(rule["maxTradeAmt"])), + min_price_increment=min_price, + min_base_amount_increment=min_amount, + min_notional_size=min(min_price * min_amount, Decimal("0.00000001")), + max_price_significant_digits=Decimal(str(rule["maxPricePrecision"])), + ) + except Exception: + self.logger().error(f"Error parsing the trading pair rule {rule}. Skipping.", exc_info=True) + return result + + async def _api_request(self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + is_auth_required: bool = False, + try_count: int = 0) -> Dict[str, Any]: + """ + Sends an aiohttp request and waits for a response. + :param method: The HTTP method, e.g. get or post + :param endpoint: The path url or the API end point + :param params: Additional get/post parameters + :param is_auth_required: Whether an authentication is required, when True the function will add encrypted + signature to the request. + :returns A response in json format. + """ + async with self._throttler.weighted_task(request_weight=1): + url = f"{Constants.REST_URL}/{endpoint}" + shared_client = await self._http_client() + # Turn `params` into either GET params or POST body data + qs_params: dict = params if method.upper() == "GET" else None + req_params = ujson.dumps(params) if method.upper() == "POST" and params is not None else None + # Generate auth headers if needed. + headers: dict = {"Content-Type": "application/json", "User-Agent": "hummingbot"} + if is_auth_required: + headers: dict = self._coinzoom_auth.get_headers() + # Build request coro + response_coro = shared_client.request(method=method.upper(), url=url, headers=headers, + params=qs_params, data=req_params, + timeout=Constants.API_CALL_TIMEOUT) + http_status, parsed_response, request_errors = await aiohttp_response_with_errors(response_coro) + if request_errors or parsed_response is None: + if try_count < Constants.API_MAX_RETRIES: + try_count += 1 + time_sleep = retry_sleep_time(try_count) + self.logger().info(f"Error fetching data from {url}. HTTP status is {http_status}. " + f"Retrying in {time_sleep:.0f}s.") + await asyncio.sleep(time_sleep) + return await self._api_request(method=method, endpoint=endpoint, params=params, + is_auth_required=is_auth_required, try_count=try_count) + else: + raise CoinzoomAPIError({"error": parsed_response, "status": http_status}) + if "error" in parsed_response: + raise CoinzoomAPIError(parsed_response) + return parsed_response + + def get_order_price_quantum(self, trading_pair: str, price: Decimal): + """ + Returns a price step, a minimum price increment for a given trading pair. + """ + trading_rule = self._trading_rules[trading_pair] + return trading_rule.min_price_increment + + def get_order_size_quantum(self, trading_pair: str, order_size: Decimal): + """ + Returns an order amount step, a minimum amount increment for a given trading pair. + """ + trading_rule = self._trading_rules[trading_pair] + return Decimal(trading_rule.min_base_amount_increment) + + def get_order_book(self, trading_pair: str) -> OrderBook: + if trading_pair not in self._order_book_tracker.order_books: + raise ValueError(f"No order book exists for '{trading_pair}'.") + return self._order_book_tracker.order_books[trading_pair] + + def buy(self, trading_pair: str, amount: Decimal, order_type=OrderType.MARKET, + price: Decimal = s_decimal_NaN, **kwargs) -> str: + """ + Buys an amount of base asset (of the given trading pair). This function returns immediately. + To see an actual order, you'll have to wait for BuyOrderCreatedEvent. + :param trading_pair: The market (e.g. BTC-USDT) to buy from + :param amount: The amount in base token value + :param order_type: The order type + :param price: The price (note: this is no longer optional) + :returns A new internal order id + """ + order_id: str = get_new_client_order_id(True, trading_pair) + safe_ensure_future(self._create_order(TradeType.BUY, order_id, trading_pair, amount, order_type, price)) + return order_id + + def sell(self, trading_pair: str, amount: Decimal, order_type=OrderType.MARKET, + price: Decimal = s_decimal_NaN, **kwargs) -> str: + """ + Sells an amount of base asset (of the given trading pair). This function returns immediately. + To see an actual order, you'll have to wait for SellOrderCreatedEvent. + :param trading_pair: The market (e.g. BTC-USDT) to sell from + :param amount: The amount in base token value + :param order_type: The order type + :param price: The price (note: this is no longer optional) + :returns A new internal order id + """ + order_id: str = get_new_client_order_id(False, trading_pair) + safe_ensure_future(self._create_order(TradeType.SELL, order_id, trading_pair, amount, order_type, price)) + return order_id + + def cancel(self, trading_pair: str, order_id: str): + """ + Cancel an order. This function returns immediately. + To get the cancellation result, you'll have to wait for OrderCancelledEvent. + :param trading_pair: The market (e.g. BTC-USDT) of the order. + :param order_id: The internal order id (also called client_order_id) + """ + safe_ensure_future(self._execute_cancel(trading_pair, order_id)) + return order_id + + async def _create_order(self, + trade_type: TradeType, + order_id: str, + trading_pair: str, + amount: Decimal, + order_type: OrderType, + price: Decimal): + """ + Calls create-order API end point to place an order, starts tracking the order and triggers order created event. + :param trade_type: BUY or SELL + :param order_id: Internal order id (also called client_order_id) + :param trading_pair: The market to place order + :param amount: The order amount (in base token value) + :param order_type: The order type + :param price: The order price + """ + if not order_type.is_limit_type(): + raise Exception(f"Unsupported order type: {order_type}") + trading_rule = self._trading_rules[trading_pair] + + amount = self.quantize_order_amount(trading_pair, amount) + price = self.quantize_order_price(trading_pair, price) + if amount < trading_rule.min_order_size: + raise ValueError(f"Buy order amount {amount} is lower than the minimum order size " + f"{trading_rule.min_order_size}.") + order_type_str = order_type.name.upper().split("_")[0] + api_params = {"symbol": convert_to_exchange_trading_pair(trading_pair), + "orderType": order_type_str, + "orderSide": trade_type.name.upper(), + "quantity": f"{amount:f}", + "price": f"{price:f}", + "originType": Constants.HBOT_BROKER_ID, + # CoinZoom doesn't support client order id yet + # "clientOrderId": order_id, + "payFeesWithZoomToken": "true", + } + self.start_tracking_order(order_id, None, trading_pair, trade_type, price, amount, order_type) + try: + order_result = await self._api_request("POST", Constants.ENDPOINT["ORDER_CREATE"], api_params, True) + exchange_order_id = str(order_result) + tracked_order = self._in_flight_orders.get(order_id) + if tracked_order is not None: + self.logger().info(f"Created {order_type.name} {trade_type.name} order {order_id} for " + f"{amount} {trading_pair}.") + tracked_order.update_exchange_order_id(exchange_order_id) + if trade_type is TradeType.BUY: + event_tag = MarketEvent.BuyOrderCreated + event_cls = BuyOrderCreatedEvent + else: + event_tag = MarketEvent.SellOrderCreated + event_cls = SellOrderCreatedEvent + self.trigger_event(event_tag, + event_cls(self.current_timestamp, order_type, trading_pair, amount, price, order_id)) + except asyncio.CancelledError: + raise + except CoinzoomAPIError as e: + error_reason = e.error_payload.get('error', {}).get('message') + self.stop_tracking_order(order_id) + self.logger().network( + f"Error submitting {trade_type.name} {order_type.name} order to {Constants.EXCHANGE_NAME} for " + f"{amount} {trading_pair} {price} - {error_reason}.", + exc_info=True, + app_warning_msg=(f"Error submitting order to {Constants.EXCHANGE_NAME} - {error_reason}.") + ) + self.trigger_event(MarketEvent.OrderFailure, + MarketOrderFailureEvent(self.current_timestamp, order_id, order_type)) + + def start_tracking_order(self, + order_id: str, + exchange_order_id: str, + trading_pair: str, + trade_type: TradeType, + price: Decimal, + amount: Decimal, + order_type: OrderType): + """ + Starts tracking an order by simply adding it into _in_flight_orders dictionary. + """ + self._in_flight_orders[order_id] = CoinzoomInFlightOrder( + client_order_id=order_id, + exchange_order_id=exchange_order_id, + trading_pair=trading_pair, + order_type=order_type, + trade_type=trade_type, + price=price, + amount=amount + ) + + def stop_tracking_order(self, order_id: str): + """ + Stops tracking an order by simply removing it from _in_flight_orders dictionary. + """ + if order_id in self._in_flight_orders: + del self._in_flight_orders[order_id] + if order_id in self._order_not_found_records: + del self._order_not_found_records[order_id] + + async def _execute_cancel(self, trading_pair: str, order_id: str) -> str: + """ + Executes order cancellation process by first calling cancel-order API. The API result doesn't confirm whether + the cancellation is successful, it simply states it receives the request. + :param trading_pair: The market trading pair (Unused during cancel on CoinZoom) + :param order_id: The internal order id + order.last_state to change to CANCELED + """ + order_was_cancelled = False + try: + tracked_order = self._in_flight_orders.get(order_id) + if tracked_order is None: + raise ValueError(f"Failed to cancel order - {order_id}. Order not found.") + if tracked_order.exchange_order_id is None: + await tracked_order.get_exchange_order_id() + ex_order_id = tracked_order.exchange_order_id + api_params = { + "orderId": ex_order_id, + "symbol": convert_to_exchange_trading_pair(trading_pair) + } + await self._api_request("POST", + Constants.ENDPOINT["ORDER_DELETE"], + api_params, + is_auth_required=True) + order_was_cancelled = True + except asyncio.CancelledError: + raise + except CoinzoomAPIError as e: + err = e.error_payload.get('error', e.error_payload) + self.logger().error(f"Order Cancel API Error: {err}") + # CoinZoom doesn't report any error if the order wasn't found so we can only handle API failures here. + self._order_not_found_records[order_id] = self._order_not_found_records.get(order_id, 0) + 1 + if self._order_not_found_records[order_id] >= self.ORDER_NOT_EXIST_CANCEL_COUNT: + order_was_cancelled = True + if order_was_cancelled: + self.logger().info(f"Successfully cancelled order {order_id} on {Constants.EXCHANGE_NAME}.") + self.stop_tracking_order(order_id) + self.trigger_event(MarketEvent.OrderCancelled, + OrderCancelledEvent(self.current_timestamp, order_id)) + tracked_order.cancelled_event.set() + return CancellationResult(order_id, True) + else: + self.logger().network( + f"Failed to cancel order {order_id}: {err.get('message', str(err))}", + exc_info=True, + app_warning_msg=f"Failed to cancel the order {order_id} on {Constants.EXCHANGE_NAME}. " + f"Check API key and network connection." + ) + return CancellationResult(order_id, False) + + async def _status_polling_loop(self): + """ + Periodically update user balances and order status via REST API. This serves as a fallback measure for web + socket API updates. + """ + while True: + try: + self._poll_notifier = asyncio.Event() + await self._poll_notifier.wait() + await safe_gather( + self._update_balances(), + self._update_order_status(), + ) + self._last_poll_timestamp = self.current_timestamp + except asyncio.CancelledError: + raise + except Exception as e: + self.logger().error(str(e), exc_info=True) + warn_msg = (f"Could not fetch account updates from {Constants.EXCHANGE_NAME}. " + "Check API key and network connection.") + self.logger().network("Unexpected error while fetching account updates.", exc_info=True, + app_warning_msg=warn_msg) + await asyncio.sleep(0.5) + + async def _update_balances(self): + """ + Calls REST API to update total and available balances. + """ + account_info = await self._api_request("GET", Constants.ENDPOINT["USER_BALANCES"], is_auth_required=True) + self._process_balance_message(account_info) + + async def _update_order_status(self): + """ + Calls REST API to get status update for each in-flight order. + """ + last_tick = int(self._last_poll_timestamp / Constants.UPDATE_ORDER_STATUS_INTERVAL) + current_tick = int(self.current_timestamp / Constants.UPDATE_ORDER_STATUS_INTERVAL) + + if current_tick > last_tick and len(self._in_flight_orders) > 0: + tracked_orders = list(self._in_flight_orders.values()) + api_params = { + 'symbol': None, + 'orderSide': None, + 'orderStatuses': ["NEW", "PARTIALLY_FILLED"], + 'size': 500, + 'bookmarkOrderId': None + } + self.logger().debug(f"Polling for order status updates of {len(tracked_orders)} orders.") + open_orders = await self._api_request("POST", + Constants.ENDPOINT["ORDER_STATUS"], + api_params, + is_auth_required=True) + + open_orders_dict = {o['id']: o for o in open_orders} + found_ex_order_ids = list(open_orders_dict.keys()) + + for tracked_order in tracked_orders: + client_order_id = tracked_order.client_order_id + ex_order_id = tracked_order.exchange_order_id + if ex_order_id not in found_ex_order_ids: + self._order_not_found_records[client_order_id] = \ + self._order_not_found_records.get(client_order_id, 0) + 1 + if self._order_not_found_records[client_order_id] < self.ORDER_NOT_EXIST_CONFIRMATION_COUNT: + # Wait until the order is not found a few times before actually treating it as failed. + continue + self.trigger_event(MarketEvent.OrderFailure, + MarketOrderFailureEvent( + self.current_timestamp, client_order_id, tracked_order.order_type)) + self.stop_tracking_order(client_order_id) + else: + self._process_order_message(open_orders_dict[ex_order_id]) + + def _process_order_message(self, order_msg: Dict[str, Any]): + """ + Updates in-flight order and triggers cancellation or failure event if needed. + :param order_msg: The order response from either REST or web socket API (they are of the same format) + Example Orders: + REST request + { + "id" : "977f82aa-23dc-4c8b-982c-2ee7d2002882", + "clientOrderId" : null, + "symbol" : "BTC/USD", + "orderType" : "LIMIT", + "orderSide" : "BUY", + "quantity" : 0.1, + "price" : 54570, + "payFeesWithZoomToken" : false, + "orderStatus" : "PARTIALLY_FILLED", + "timestamp" : "2021-03-24T04:07:26.260253Z", + "executions" : + [ + { + "id" : "38761582-2b37-4e27-a561-434981d21a96", + "executionType" : "PARTIAL_FILL", + "orderStatus" : "PARTIALLY_FILLED", + "lastPrice" : 54570, + "averagePrice" : 54570, + "lastQuantity" : 0.01, + "leavesQuantity" : 0.09, + "cumulativeQuantity" : 0.01, + "rejectReason" : null, + "timestamp" : "2021-03-24T04:07:44.503222Z" + } + ] + } + WS request + { + 'orderId': '962a2a54-fbcf-4d89-8f37-a8854020a823', + 'symbol': 'BTC/USD', 'orderType': 'LIMIT', + 'orderSide': 'BUY', + 'price': 5000, + 'quantity': 0.001, + 'executionType': 'CANCEL', + 'orderStatus': 'CANCELLED', + 'lastQuantity': 0, + 'leavesQuantity': 0, + 'cumulativeQuantity': 0, + 'transactTime': '2021-03-23T19:06:51.155520Z' + + ... Optional fields + + 'id': '4eb3f26c-91bd-4bd2-bacb-15b2f432c452', + "orderType": "LIMIT", + "lastPrice": 56518.7, + "averagePrice": 56518.7, + } + """ + # Looks like CoinZoom might support clientOrderId eventually so leaving this here for now. + # if order_msg.get('clientOrderId') is not None: + # client_order_id = order_msg["clientOrderId"] + # if client_order_id not in self._in_flight_orders: + # return + # tracked_order = self._in_flight_orders[client_order_id] + # else: + if "orderId" not in order_msg: + exchange_order_id = str(order_msg["id"]) + else: + exchange_order_id = str(order_msg["orderId"]) + tracked_orders = list(self._in_flight_orders.values()) + track_order = [o for o in tracked_orders if exchange_order_id == o.exchange_order_id] + if not track_order: + return + tracked_order = track_order[0] + + # Estimate fee + order_msg["trade_fee"] = self.estimate_fee_pct(tracked_order.order_type is OrderType.LIMIT_MAKER) + updated = tracked_order.update_with_order_update(order_msg) + # Call Update balances on every message to catch order create, fill and cancel. + safe_ensure_future(self._update_balances()) + + if updated: + safe_ensure_future(self._trigger_order_fill(tracked_order, order_msg)) + elif tracked_order.is_cancelled: + self.logger().info(f"Successfully cancelled order {tracked_order.client_order_id}.") + self.stop_tracking_order(tracked_order.client_order_id) + self.trigger_event(MarketEvent.OrderCancelled, + OrderCancelledEvent(self.current_timestamp, tracked_order.client_order_id)) + tracked_order.cancelled_event.set() + elif tracked_order.is_failure: + self.logger().info(f"The order {tracked_order.client_order_id} has failed according to order status API. ") + self.trigger_event(MarketEvent.OrderFailure, + MarketOrderFailureEvent( + self.current_timestamp, tracked_order.client_order_id, tracked_order.order_type)) + self.stop_tracking_order(tracked_order.client_order_id) + + async def _trigger_order_fill(self, + tracked_order: CoinzoomInFlightOrder, + update_msg: Dict[str, Any]): + self.trigger_event( + MarketEvent.OrderFilled, + OrderFilledEvent( + self.current_timestamp, + tracked_order.client_order_id, + tracked_order.trading_pair, + tracked_order.trade_type, + tracked_order.order_type, + Decimal(str(update_msg.get("averagePrice", update_msg.get("price", "0")))), + tracked_order.executed_amount_base, + TradeFee(percent=update_msg["trade_fee"]), + update_msg.get("exchange_trade_id", update_msg.get("id", update_msg.get("orderId"))) + ) + ) + if math.isclose(tracked_order.executed_amount_base, tracked_order.amount) or \ + tracked_order.executed_amount_base >= tracked_order.amount or \ + tracked_order.is_done: + tracked_order.last_state = "FILLED" + self.logger().info(f"The {tracked_order.trade_type.name} order " + f"{tracked_order.client_order_id} has completed " + f"according to order status API.") + event_tag = MarketEvent.BuyOrderCompleted if tracked_order.trade_type is TradeType.BUY \ + else MarketEvent.SellOrderCompleted + event_class = BuyOrderCompletedEvent if tracked_order.trade_type is TradeType.BUY \ + else SellOrderCompletedEvent + await asyncio.sleep(0.1) + self.trigger_event(event_tag, + event_class(self.current_timestamp, + tracked_order.client_order_id, + tracked_order.base_asset, + tracked_order.quote_asset, + tracked_order.fee_asset, + tracked_order.executed_amount_base, + tracked_order.executed_amount_quote, + tracked_order.fee_paid, + tracked_order.order_type)) + self.stop_tracking_order(tracked_order.client_order_id) + + def _process_balance_message(self, balance_update): + local_asset_names = set(self._account_balances.keys()) + remote_asset_names = set() + for account in balance_update: + asset_name = account["currency"] + total_bal = Decimal(str(account["totalBalance"])) + self._account_available_balances[asset_name] = total_bal + Decimal(str(account["reservedBalance"])) + self._account_balances[asset_name] = total_bal + remote_asset_names.add(asset_name) + + asset_names_to_remove = local_asset_names.difference(remote_asset_names) + for asset_name in asset_names_to_remove: + del self._account_available_balances[asset_name] + del self._account_balances[asset_name] + + async def cancel_all(self, timeout_seconds: float) -> List[CancellationResult]: + """ + Cancels all in-flight orders and waits for cancellation results. + Used by bot's top level stop and exit commands (cancelling outstanding orders on exit) + :param timeout_seconds: The timeout at which the operation will be canceled. + :returns List of CancellationResult which indicates whether each order is successfully cancelled. + """ + # if self._trading_pairs is None: + # raise Exception("cancel_all can only be used when trading_pairs are specified.") + open_orders = [o for o in self._in_flight_orders.values() if not o.is_done] + if len(open_orders) == 0: + return [] + tasks = [self._execute_cancel(o.trading_pair, o.client_order_id) for o in open_orders] + cancellation_results = [] + try: + async with timeout(timeout_seconds): + cancellation_results = await safe_gather(*tasks, return_exceptions=False) + except Exception: + self.logger().network( + "Unexpected error cancelling orders.", exc_info=True, + app_warning_msg=(f"Failed to cancel all orders on {Constants.EXCHANGE_NAME}. " + "Check API key and network connection.") + ) + return cancellation_results + + def tick(self, timestamp: float): + """ + Is called automatically by the clock for each clock's tick (1 second by default). + It checks if status polling task is due for execution. + """ + now = time.time() + poll_interval = (Constants.SHORT_POLL_INTERVAL + if now - self._user_stream_tracker.last_recv_time > 60.0 + else Constants.LONG_POLL_INTERVAL) + last_tick = int(self._last_timestamp / poll_interval) + current_tick = int(timestamp / poll_interval) + if current_tick > last_tick: + if not self._poll_notifier.is_set(): + self._poll_notifier.set() + self._last_timestamp = timestamp + + def get_fee(self, + base_currency: str, + quote_currency: str, + order_type: OrderType, + order_side: TradeType, + amount: Decimal, + price: Decimal = s_decimal_NaN) -> TradeFee: + """ + To get trading fee, this function is simplified by using fee override configuration. Most parameters to this + function are ignore except order_type. Use OrderType.LIMIT_MAKER to specify you want trading fee for + maker order. + """ + is_maker = order_type is OrderType.LIMIT_MAKER + return TradeFee(percent=self.estimate_fee_pct(is_maker)) + + async def _iter_user_event_queue(self) -> AsyncIterable[Dict[str, any]]: + while True: + try: + yield await self._user_stream_tracker.user_stream.get() + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unknown error. Retrying after 1 seconds.", exc_info=True, + app_warning_msg=(f"Could not fetch user events from {Constants.EXCHANGE_NAME}. " + "Check API key and network connection.")) + await asyncio.sleep(1.0) + + async def _user_stream_event_listener(self): + """ + Listens to message in _user_stream_tracker.user_stream queue. The messages are put in by + CoinzoomAPIUserStreamDataSource. + """ + async for event_message in self._iter_user_event_queue(): + try: + event_methods = [ + Constants.WS_METHODS["USER_ORDERS"], + Constants.WS_METHODS["USER_ORDERS_CANCEL"], + ] + + msg_keys = list(event_message.keys()) if event_message is not None else [] + + method_key = [key for key in msg_keys if key in event_methods] + + if len(method_key) != 1: + continue + + method: str = method_key[0] + + if method == Constants.WS_METHODS["USER_ORDERS"]: + self._process_order_message(event_message[method]) + elif method == Constants.WS_METHODS["USER_ORDERS_CANCEL"]: + self._process_order_message(event_message[method]) + except asyncio.CancelledError: + raise + except Exception: + self.logger().error("Unexpected error in user stream listener loop.", exc_info=True) + await asyncio.sleep(5.0) + + # This is currently unused, but looks like a future addition. + async def get_open_orders(self) -> List[OpenOrder]: + tracked_orders = list(self._in_flight_orders.values()) + api_params = { + 'symbol': None, + 'orderSide': None, + 'orderStatuses': ["NEW", "PARTIALLY_FILLED"], + 'size': 500, + 'bookmarkOrderId': None + } + result = await self._api_request("POST", Constants.ENDPOINT["USER_ORDERS"], api_params, is_auth_required=True) + ret_val = [] + for order in result: + exchange_order_id = str(order["id"]) + # CoinZoom doesn't support client order ids yet so we must find it from the tracked orders. + track_order = [o for o in tracked_orders if exchange_order_id == o.exchange_order_id] + if not track_order or len(track_order) < 1: + # Skip untracked orders + continue + client_order_id = track_order[0].client_order_id + # if Constants.HBOT_BROKER_ID not in order["clientOrderId"]: + # continue + if order["orderType"] != OrderType.LIMIT.name.upper(): + self.logger().info(f"Unsupported order type found: {order['type']}") + # Skip and report non-limit orders + continue + ret_val.append( + OpenOrder( + client_order_id=client_order_id, + trading_pair=convert_from_exchange_trading_pair(order["symbol"]), + price=Decimal(str(order["price"])), + amount=Decimal(str(order["quantity"])), + executed_amount=Decimal(str(order["cumQuantity"])), + status=order["orderStatus"], + order_type=OrderType.LIMIT, + is_buy=True if order["orderSide"].lower() == TradeType.BUY.name.lower() else False, + time=str_date_to_ts(order["timestamp"]), + exchange_order_id=order["id"] + ) + ) + return ret_val diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_in_flight_order.py b/hummingbot/connector/exchange/coinzoom/coinzoom_in_flight_order.py new file mode 100644 index 0000000000..61da8fdb0b --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_in_flight_order.py @@ -0,0 +1,163 @@ +from decimal import Decimal +from typing import ( + Any, + Dict, + Optional, +) +import asyncio +from hummingbot.core.event.events import ( + OrderType, + TradeType +) +from hummingbot.connector.in_flight_order_base import InFlightOrderBase + +s_decimal_0 = Decimal(0) + + +class CoinzoomInFlightOrder(InFlightOrderBase): + def __init__(self, + client_order_id: str, + exchange_order_id: Optional[str], + trading_pair: str, + order_type: OrderType, + trade_type: TradeType, + price: Decimal, + amount: Decimal, + initial_state: str = "NEW"): + super().__init__( + client_order_id, + exchange_order_id, + trading_pair, + order_type, + trade_type, + price, + amount, + initial_state, + ) + self.trade_id_set = set() + self.cancelled_event = asyncio.Event() + + @property + def is_done(self) -> bool: + return self.last_state in {"FILLED", "CANCELLED", "REJECTED"} + + @property + def is_failure(self) -> bool: + return self.last_state in {"REJECTED"} + + @property + def is_cancelled(self) -> bool: + return self.last_state in {"CANCELLED"} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> InFlightOrderBase: + """ + :param data: json data from API + :return: formatted InFlightOrder + """ + retval = CoinzoomInFlightOrder( + data["client_order_id"], + data["exchange_order_id"], + data["trading_pair"], + getattr(OrderType, data["order_type"]), + getattr(TradeType, data["trade_type"]), + Decimal(data["price"]), + Decimal(data["amount"]), + data["last_state"] + ) + retval.executed_amount_base = Decimal(data["executed_amount_base"]) + retval.executed_amount_quote = Decimal(data["executed_amount_quote"]) + retval.fee_asset = data["fee_asset"] + retval.fee_paid = Decimal(data["fee_paid"]) + retval.last_state = data["last_state"] + return retval + + def update_with_order_update(self, order_update: Dict[str, Any]) -> bool: + """ + Updates the in flight order with order update (from private/get-order-detail end point) + return: True if the order gets updated otherwise False + Example Orders: + REST request + { + "id" : "977f82aa-23dc-4c8b-982c-2ee7d2002882", + "clientOrderId" : null, + "symbol" : "BTC/USD", + "orderType" : "LIMIT", + "orderSide" : "BUY", + "quantity" : 0.1, + "price" : 54570, + "payFeesWithZoomToken" : false, + "orderStatus" : "PARTIALLY_FILLED", + "timestamp" : "2021-03-24T04:07:26.260253Z", + "executions" : + [ + { + "id" : "38761582-2b37-4e27-a561-434981d21a96", + "executionType" : "PARTIAL_FILL", + "orderStatus" : "PARTIALLY_FILLED", + "lastPrice" : 54570, + "averagePrice" : 54570, + "lastQuantity" : 0.01, + "leavesQuantity" : 0.09, + "cumulativeQuantity" : 0.01, + "rejectReason" : null, + "timestamp" : "2021-03-24T04:07:44.503222Z" + } + ] + } + WS request + { + 'id': '4eb3f26c-91bd-4bd2-bacb-15b2f432c452', + 'orderId': '962a2a54-fbcf-4d89-8f37-a8854020a823', + 'symbol': 'BTC/USD', 'orderType': 'LIMIT', + 'orderSide': 'BUY', + 'price': 5000, + 'quantity': 0.001, + 'executionType': 'CANCEL', + 'orderStatus': 'CANCELLED', + 'lastQuantity': 0, + 'leavesQuantity': 0, + 'cumulativeQuantity': 0, + 'transactTime': '2021-03-23T19:06:51.155520Z' + } + """ + # Update order execution status + self.last_state = order_update["orderStatus"] + + if 'cumulativeQuantity' not in order_update and 'executions' not in order_update: + return False + + trades = order_update.get('executions') + if trades is not None: + new_trades = False + for trade in trades: + trade_id = str(trade["timestamp"]) + if trade_id not in self.trade_id_set: + self.trade_id_set.add(trade_id) + order_update["exchange_trade_id"] = trade.get("id") + # Add executed amounts + executed_price = Decimal(str(trade.get("lastPrice", "0"))) + self.executed_amount_base += Decimal(str(trade["lastQuantity"])) + self.executed_amount_quote += executed_price * self.executed_amount_base + # Set new trades flag + new_trades = True + if not new_trades: + # trades already recorded + return False + else: + trade_id = str(order_update["transactTime"]) + if trade_id in self.trade_id_set: + # trade already recorded + return False + self.trade_id_set.add(trade_id) + # Set executed amounts + executed_price = Decimal(str(order_update.get("averagePrice", order_update.get("price", "0")))) + self.executed_amount_base = Decimal(str(order_update["cumulativeQuantity"])) + self.executed_amount_quote = executed_price * self.executed_amount_base + if self.executed_amount_base <= s_decimal_0: + # No trades executed yet. + return False + self.fee_paid += order_update.get("trade_fee") * self.executed_amount_base + if not self.fee_asset: + self.fee_asset = self.quote_asset + return True diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book.py new file mode 100644 index 0000000000..e771d48cf3 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python + +import logging +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants + +from sqlalchemy.engine import RowProxy +from typing import ( + Optional, + Dict, + List, Any) +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import ( + OrderBookMessage, OrderBookMessageType +) +from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_message import CoinzoomOrderBookMessage +from .coinzoom_utils import ( + convert_from_exchange_trading_pair, + str_date_to_ts, +) + +_logger = None + + +class CoinzoomOrderBook(OrderBook): + @classmethod + def logger(cls) -> HummingbotLogger: + global _logger + if _logger is None: + _logger = logging.getLogger(__name__) + return _logger + + @classmethod + def snapshot_message_from_exchange(cls, + msg: Dict[str, any], + timestamp: float, + metadata: Optional[Dict] = None): + """ + Convert json snapshot data into standard OrderBookMessage format + :param msg: json snapshot data from live web socket stream + :param timestamp: timestamp attached to incoming data + :return: CoinzoomOrderBookMessage + """ + + if metadata: + msg.update(metadata) + + return CoinzoomOrderBookMessage( + message_type=OrderBookMessageType.SNAPSHOT, + content=msg, + timestamp=timestamp + ) + + @classmethod + def snapshot_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None): + """ + *used for backtesting + Convert a row of snapshot data into standard OrderBookMessage format + :param record: a row of snapshot data from the database + :return: CoinzoomOrderBookMessage + """ + return CoinzoomOrderBookMessage( + message_type=OrderBookMessageType.SNAPSHOT, + content=record.json, + timestamp=record.timestamp + ) + + @classmethod + def diff_message_from_exchange(cls, + msg: Dict[str, any], + timestamp: Optional[float] = None, + metadata: Optional[Dict] = None): + """ + Convert json diff data into standard OrderBookMessage format + :param msg: json diff data from live web socket stream + :param timestamp: timestamp attached to incoming data + :return: CoinzoomOrderBookMessage + """ + + if metadata: + msg.update(metadata) + + return CoinzoomOrderBookMessage( + message_type=OrderBookMessageType.DIFF, + content=msg, + timestamp=timestamp + ) + + @classmethod + def diff_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None): + """ + *used for backtesting + Convert a row of diff data into standard OrderBookMessage format + :param record: a row of diff data from the database + :return: CoinzoomOrderBookMessage + """ + return CoinzoomOrderBookMessage( + message_type=OrderBookMessageType.DIFF, + content=record.json, + timestamp=record.timestamp + ) + + @classmethod + def trade_message_from_exchange(cls, + msg: Dict[str, Any], + timestamp: Optional[float] = None, + metadata: Optional[Dict] = None): + """ + Convert a trade data into standard OrderBookMessage format + :param record: a trade data from the database + :return: CoinzoomOrderBookMessage + """ + + trade_msg = { + "trade_type": msg[4], + "price": msg[1], + "amount": msg[2], + "trading_pair": convert_from_exchange_trading_pair(msg[0]) + } + trade_timestamp = str_date_to_ts(msg[3]) + + return CoinzoomOrderBookMessage( + message_type=OrderBookMessageType.TRADE, + content=trade_msg, + timestamp=trade_timestamp + ) + + @classmethod + def trade_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None): + """ + *used for backtesting + Convert a row of trade data into standard OrderBookMessage format + :param record: a row of trade data from the database + :return: CoinzoomOrderBookMessage + """ + return CoinzoomOrderBookMessage( + message_type=OrderBookMessageType.TRADE, + content=record.json, + timestamp=record.timestamp + ) + + @classmethod + def from_snapshot(cls, snapshot: OrderBookMessage): + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book needs to retain individual order data.") + + @classmethod + def restore_from_snapshot_and_diffs(self, snapshot: OrderBookMessage, diffs: List[OrderBookMessage]): + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book needs to retain individual order data.") diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_message.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_message.py new file mode 100644 index 0000000000..d6bc00541d --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_message.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +from typing import ( + Dict, + Optional, +) + +from hummingbot.core.data_type.order_book_message import ( + OrderBookMessage, + OrderBookMessageType, +) +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants + + +class CoinzoomOrderBookMessage(OrderBookMessage): + def __new__( + cls, + message_type: OrderBookMessageType, + content: Dict[str, any], + timestamp: Optional[float] = None, + *args, + **kwargs, + ): + if timestamp is None: + if message_type is OrderBookMessageType.SNAPSHOT: + raise ValueError("timestamp must not be None when initializing snapshot messages.") + timestamp = content["timestamp"] + + return super(CoinzoomOrderBookMessage, cls).__new__( + cls, message_type, content, timestamp=timestamp, *args, **kwargs + ) + + @property + def update_id(self) -> int: + if self.type in [OrderBookMessageType.DIFF, OrderBookMessageType.SNAPSHOT]: + return self.timestamp + else: + return -1 + + @property + def trade_id(self) -> int: + if self.type is OrderBookMessageType.TRADE: + return self.timestamp + return -1 + + @property + def trading_pair(self) -> str: + return self.content["trading_pair"] + + # The `asks` and `bids` properties are only used in the methods below. + # They are all replaced or unused in this connector: + # OrderBook.restore_from_snapshot_and_diffs + # OrderBookTracker._track_single_book + # MockAPIOrderBookDataSource.get_tracking_pairs + @property + def asks(self): + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book uses active_order_tracker.") + + @property + def bids(self): + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book uses active_order_tracker.") + + def __eq__(self, other) -> bool: + return self.type == other.type and self.timestamp == other.timestamp + + def __lt__(self, other) -> bool: + if self.timestamp != other.timestamp: + return self.timestamp < other.timestamp + else: + """ + If timestamp is the same, the ordering is snapshot < diff < trade + """ + return self.type.value < other.type.value diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker.py new file mode 100644 index 0000000000..c81ca9a7bd --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +import asyncio +import bisect +import logging +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants +import time + +from collections import defaultdict, deque +from typing import Optional, Dict, List, Deque +from hummingbot.core.data_type.order_book_message import OrderBookMessageType +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.order_book_tracker import OrderBookTracker +from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_message import CoinzoomOrderBookMessage +from hummingbot.connector.exchange.coinzoom.coinzoom_active_order_tracker import CoinzoomActiveOrderTracker +from hummingbot.connector.exchange.coinzoom.coinzoom_api_order_book_data_source import CoinzoomAPIOrderBookDataSource +from hummingbot.connector.exchange.coinzoom.coinzoom_order_book import CoinzoomOrderBook + + +class CoinzoomOrderBookTracker(OrderBookTracker): + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, trading_pairs: Optional[List[str]] = None,): + super().__init__(CoinzoomAPIOrderBookDataSource(trading_pairs), trading_pairs) + + self._ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + self._order_book_snapshot_stream: asyncio.Queue = asyncio.Queue() + self._order_book_diff_stream: asyncio.Queue = asyncio.Queue() + self._order_book_trade_stream: asyncio.Queue = asyncio.Queue() + self._process_msg_deque_task: Optional[asyncio.Task] = None + self._past_diffs_windows: Dict[str, Deque] = {} + self._order_books: Dict[str, CoinzoomOrderBook] = {} + self._saved_message_queues: Dict[str, Deque[CoinzoomOrderBookMessage]] = \ + defaultdict(lambda: deque(maxlen=1000)) + self._active_order_trackers: Dict[str, CoinzoomActiveOrderTracker] = defaultdict(CoinzoomActiveOrderTracker) + self._order_book_stream_listener_task: Optional[asyncio.Task] = None + self._order_book_trade_listener_task: Optional[asyncio.Task] = None + + @property + def exchange_name(self) -> str: + """ + Name of the current exchange + """ + return Constants.EXCHANGE_NAME + + async def _track_single_book(self, trading_pair: str): + """ + Update an order book with changes from the latest batch of received messages + """ + past_diffs_window: Deque[CoinzoomOrderBookMessage] = deque() + self._past_diffs_windows[trading_pair] = past_diffs_window + + message_queue: asyncio.Queue = self._tracking_message_queues[trading_pair] + order_book: CoinzoomOrderBook = self._order_books[trading_pair] + active_order_tracker: CoinzoomActiveOrderTracker = self._active_order_trackers[trading_pair] + + last_message_timestamp: float = time.time() + diff_messages_accepted: int = 0 + + while True: + try: + message: CoinzoomOrderBookMessage = None + saved_messages: Deque[CoinzoomOrderBookMessage] = self._saved_message_queues[trading_pair] + # Process saved messages first if there are any + if len(saved_messages) > 0: + message = saved_messages.popleft() + else: + message = await message_queue.get() + + if message.type is OrderBookMessageType.DIFF: + bids, asks = active_order_tracker.convert_diff_message_to_order_book_row(message) + order_book.apply_diffs(bids, asks, message.update_id) + past_diffs_window.append(message) + while len(past_diffs_window) > self.PAST_DIFF_WINDOW_SIZE: + past_diffs_window.popleft() + diff_messages_accepted += 1 + + # Output some statistics periodically. + now: float = time.time() + if int(now / 60.0) > int(last_message_timestamp / 60.0): + self.logger().debug(f"Processed {diff_messages_accepted} order book diffs for {trading_pair}.") + diff_messages_accepted = 0 + last_message_timestamp = now + elif message.type is OrderBookMessageType.SNAPSHOT: + past_diffs: List[CoinzoomOrderBookMessage] = list(past_diffs_window) + # only replay diffs later than snapshot, first update active order with snapshot then replay diffs + replay_position = bisect.bisect_right(past_diffs, message) + replay_diffs = past_diffs[replay_position:] + s_bids, s_asks = active_order_tracker.convert_snapshot_message_to_order_book_row(message) + order_book.apply_snapshot(s_bids, s_asks, message.update_id) + for diff_message in replay_diffs: + d_bids, d_asks = active_order_tracker.convert_diff_message_to_order_book_row(diff_message) + order_book.apply_diffs(d_bids, d_asks, diff_message.update_id) + + self.logger().debug(f"Processed order book snapshot for {trading_pair}.") + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + f"Unexpected error processing order book messages for {trading_pair}.", + exc_info=True, + app_warning_msg="Unexpected error processing order book messages. Retrying after 5 seconds." + ) + await asyncio.sleep(5.0) diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker_entry.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker_entry.py new file mode 100644 index 0000000000..94feda5275 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker_entry.py @@ -0,0 +1,21 @@ +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_tracker_entry import OrderBookTrackerEntry +from hummingbot.connector.exchange.coinzoom.coinzoom_active_order_tracker import CoinzoomActiveOrderTracker + + +class CoinzoomOrderBookTrackerEntry(OrderBookTrackerEntry): + def __init__( + self, trading_pair: str, timestamp: float, order_book: OrderBook, active_order_tracker: CoinzoomActiveOrderTracker + ): + self._active_order_tracker = active_order_tracker + super(CoinzoomOrderBookTrackerEntry, self).__init__(trading_pair, timestamp, order_book) + + def __repr__(self) -> str: + return ( + f"CoinzoomOrderBookTrackerEntry(trading_pair='{self._trading_pair}', timestamp='{self._timestamp}', " + f"order_book='{self._order_book}')" + ) + + @property + def active_order_tracker(self) -> CoinzoomActiveOrderTracker: + return self._active_order_tracker diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_user_stream_tracker.py b/hummingbot/connector/exchange/coinzoom/coinzoom_user_stream_tracker.py new file mode 100644 index 0000000000..79c7584d70 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_user_stream_tracker.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import asyncio +import logging +from typing import ( + Optional, + List, +) +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.user_stream_tracker import ( + UserStreamTracker +) +from hummingbot.core.utils.async_utils import ( + safe_ensure_future, + safe_gather, +) +from hummingbot.connector.exchange.coinzoom.coinzoom_api_user_stream_data_source import \ + CoinzoomAPIUserStreamDataSource +from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants + + +class CoinzoomUserStreamTracker(UserStreamTracker): + _cbpust_logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._bust_logger is None: + cls._bust_logger = logging.getLogger(__name__) + return cls._bust_logger + + def __init__(self, + coinzoom_auth: Optional[CoinzoomAuth] = None, + trading_pairs: Optional[List[str]] = []): + super().__init__() + self._coinzoom_auth: CoinzoomAuth = coinzoom_auth + self._trading_pairs: List[str] = trading_pairs + self._ev_loop: asyncio.events.AbstractEventLoop = asyncio.get_event_loop() + self._data_source: Optional[UserStreamTrackerDataSource] = None + self._user_stream_tracking_task: Optional[asyncio.Task] = None + + @property + def data_source(self) -> UserStreamTrackerDataSource: + """ + *required + Initializes a user stream data source (user specific order diffs from live socket stream) + :return: OrderBookTrackerDataSource + """ + if not self._data_source: + self._data_source = CoinzoomAPIUserStreamDataSource( + coinzoom_auth=self._coinzoom_auth, + trading_pairs=self._trading_pairs + ) + return self._data_source + + @property + def exchange_name(self) -> str: + """ + *required + Name of the current exchange + """ + return Constants.EXCHANGE_NAME + + async def start(self): + """ + *required + Start all listeners and tasks + """ + self._user_stream_tracking_task = safe_ensure_future( + self.data_source.listen_for_user_stream(self._ev_loop, self._user_stream) + ) + await safe_gather(self._user_stream_tracking_task) diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_utils.py b/hummingbot/connector/exchange/coinzoom/coinzoom_utils.py new file mode 100644 index 0000000000..498ed56541 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_utils.py @@ -0,0 +1,149 @@ +import aiohttp +import asyncio +import random +from dateutil.parser import parse as dateparse +from typing import ( + Any, + Dict, + Optional, +) + +from hummingbot.core.utils.tracking_nonce import get_tracking_nonce +from hummingbot.client.config.config_var import ConfigVar +from hummingbot.client.config.config_methods import using_exchange +from .coinzoom_constants import Constants + + +CENTRALIZED = True + +EXAMPLE_PAIR = "BTC-USD" + +DEFAULT_FEES = [0.2, 0.26] + + +class CoinzoomAPIError(IOError): + def __init__(self, error_payload: Dict[str, Any]): + super().__init__(str(error_payload)) + self.error_payload = error_payload + + +# convert date string to timestamp +def str_date_to_ts(date: str) -> int: + return int(dateparse(date).timestamp() * 1e3) + + +# Request ID class +class RequestId: + """ + Generate request ids + """ + _request_id: int = 0 + + @classmethod + def generate_request_id(cls) -> int: + return get_tracking_nonce() + + +def convert_from_exchange_trading_pair(ex_trading_pair: str) -> Optional[str]: + # CoinZoom uses uppercase (BTC/USDT) + return ex_trading_pair.replace("/", "-") + + +def convert_to_exchange_trading_pair(hb_trading_pair: str, alternative: bool = False) -> str: + # CoinZoom uses uppercase (BTCUSDT) + if alternative: + return hb_trading_pair.replace("-", "_").upper() + else: + return hb_trading_pair.replace("-", "/").upper() + + +def get_new_client_order_id(is_buy: bool, trading_pair: str) -> str: + side = "B" if is_buy else "S" + symbols = trading_pair.split("-") + base = symbols[0].upper() + quote = symbols[1].upper() + base_str = f"{base[0]}{base[-1]}" + quote_str = f"{quote[0]}{quote[-1]}" + return f"{Constants.HBOT_BROKER_ID}{side}{base_str}{quote_str}{get_tracking_nonce()}" + + +def retry_sleep_time(try_count: int) -> float: + random.seed() + randSleep = 1 + float(random.randint(1, 10) / 100) + return float(2 + float(randSleep * (1 + (try_count ** try_count)))) + + +async def aiohttp_response_with_errors(request_coroutine): + http_status, parsed_response, request_errors = None, None, False + try: + async with request_coroutine as response: + http_status = response.status + try: + parsed_response = await response.json() + except Exception: + if response.status not in [204]: + request_errors = True + try: + parsed_response = str(await response.read()) + if len(parsed_response) > 100: + parsed_response = f"{parsed_response[:100]} ... (truncated)" + except Exception: + pass + TempFailure = (parsed_response is None or + (response.status not in [200, 201, 204] and "error" not in parsed_response)) + if TempFailure: + parsed_response = response.reason if parsed_response is None else parsed_response + request_errors = True + except Exception: + request_errors = True + return http_status, parsed_response, request_errors + + +async def api_call_with_retries(method, + endpoint, + params: Optional[Dict[str, Any]] = None, + shared_client=None, + try_count: int = 0) -> Dict[str, Any]: + url = f"{Constants.REST_URL}/{endpoint}" + headers = {"Content-Type": "application/json", "User-Agent": "hummingbot"} + http_client = shared_client if shared_client is not None else aiohttp.ClientSession() + # Build request coro + response_coro = http_client.request(method=method.upper(), url=url, headers=headers, + params=params, timeout=Constants.API_CALL_TIMEOUT) + http_status, parsed_response, request_errors = await aiohttp_response_with_errors(response_coro) + if shared_client is None: + await http_client.close() + if request_errors or parsed_response is None: + if try_count < Constants.API_MAX_RETRIES: + try_count += 1 + time_sleep = retry_sleep_time(try_count) + print(f"Error fetching data from {url}. HTTP status is {http_status}. " + f"Retrying in {time_sleep:.0f}s.") + await asyncio.sleep(time_sleep) + return await api_call_with_retries(method=method, endpoint=endpoint, params=params, + shared_client=shared_client, try_count=try_count) + else: + raise CoinzoomAPIError({"error": parsed_response, "status": http_status}) + return parsed_response + + +KEYS = { + "coinzoom_api_key": + ConfigVar(key="coinzoom_api_key", + prompt=f"Enter your {Constants.EXCHANGE_NAME} API key >>> ", + required_if=using_exchange("coinzoom"), + is_secure=True, + is_connect_key=True), + "coinzoom_secret_key": + ConfigVar(key="coinzoom_secret_key", + prompt=f"Enter your {Constants.EXCHANGE_NAME} secret key >>> ", + required_if=using_exchange("coinzoom"), + is_secure=True, + is_connect_key=True), + "coinzoom_username": + ConfigVar(key="coinzoom_username", + prompt=f"Enter your {Constants.EXCHANGE_NAME} ZoomMe username >>> ", + required_if=using_exchange("coinzoom"), + is_secure=True, + is_connect_key=True), +} diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_websocket.py b/hummingbot/connector/exchange/coinzoom/coinzoom_websocket.py new file mode 100644 index 0000000000..1e233e2054 --- /dev/null +++ b/hummingbot/connector/exchange/coinzoom/coinzoom_websocket.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +import asyncio +import logging +import websockets +import json +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants + + +from typing import ( + Any, + AsyncIterable, + Dict, + List, + Optional, +) +from websockets.exceptions import ConnectionClosed +from hummingbot.logger import HummingbotLogger +from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth + +# reusable websocket class +# ToDo: We should eventually remove this class, and instantiate web socket connection normally (see Binance for example) + + +class CoinzoomWebsocket(): + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, + auth: Optional[CoinzoomAuth] = None): + self._auth: Optional[CoinzoomAuth] = auth + self._isPrivate = True if self._auth is not None else False + self._WS_URL = Constants.WS_PRIVATE_URL if self._isPrivate else Constants.WS_PUBLIC_URL + self._client: Optional[websockets.WebSocketClientProtocol] = None + self._is_subscribed = False + + @property + def is_subscribed(self): + return self._is_subscribed + + # connect to exchange + async def connect(self): + # if auth class was passed into websocket class + # we need to emit authenticated requests + extra_headers = self._auth.get_headers() if self._isPrivate else None + self._client = await websockets.connect(self._WS_URL, extra_headers=extra_headers) + + return self._client + + # disconnect from exchange + async def disconnect(self): + if self._client is None: + return + + await self._client.close() + + # receive & parse messages + async def _messages(self) -> AsyncIterable[Any]: + try: + while True: + try: + raw_msg_str: str = await asyncio.wait_for(self._client.recv(), timeout=Constants.MESSAGE_TIMEOUT) + try: + msg = json.loads(raw_msg_str) + + # CoinZoom doesn't support ping or heartbeat messages. + # Can handle them here if that changes - use `safe_ensure_future`. + + # Check response for a subscribed/unsubscribed message; + result: List[str] = list([d['result'] + for k, d in msg.items() + if (isinstance(d, dict) and d.get('result') is not None)]) + + if len(result): + if result[0] == 'subscribed': + self._is_subscribed = True + elif result[0] == 'unsubscribed': + self._is_subscribed = False + yield None + else: + yield msg + except ValueError: + continue + except asyncio.TimeoutError: + await asyncio.wait_for(self._client.ping(), timeout=Constants.PING_TIMEOUT) + except asyncio.TimeoutError: + self.logger().warning("WebSocket ping timed out. Going to reconnect...") + return + except ConnectionClosed: + return + finally: + await self.disconnect() + + # emit messages + async def _emit(self, method: str, action: str, data: Optional[Dict[str, Any]] = {}) -> int: + payload = { + method: { + "action": action, + **data + } + } + return await self._client.send(json.dumps(payload)) + + # request via websocket + async def request(self, method: str, action: str, data: Optional[Dict[str, Any]] = {}) -> int: + return await self._emit(method, action, data) + + # subscribe to a method + async def subscribe(self, + streams: Optional[Dict[str, Any]] = {}) -> int: + for stream, stream_dict in streams.items(): + if self._isPrivate: + stream_dict = {**stream_dict, **self._auth.get_ws_params()} + await self.request(stream, "subscribe", stream_dict) + return True + + # unsubscribe to a method + async def unsubscribe(self, + streams: Optional[Dict[str, Any]] = {}) -> int: + for stream, stream_dict in streams.items(): + if self._isPrivate: + stream_dict = {**stream_dict, **self._auth.get_ws_params()} + await self.request(stream, "unsubscribe", stream_dict) + return True + + # listen to messages by method + async def on_message(self) -> AsyncIterable[Any]: + async for msg in self._messages(): + yield msg diff --git a/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml b/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml index fa3324522f..b8e79b7993 100644 --- a/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml +++ b/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml @@ -17,6 +17,9 @@ beaxy_taker_fee: coinbase_pro_maker_fee: coinbase_pro_taker_fee: +coinzoom_maker_fee: +coinzoom_taker_fee: + dydx_maker_fee: dydx_taker_fee: diff --git a/hummingbot/templates/conf_global_TEMPLATE.yml b/hummingbot/templates/conf_global_TEMPLATE.yml index 84946fb27f..ad833aa312 100644 --- a/hummingbot/templates/conf_global_TEMPLATE.yml +++ b/hummingbot/templates/conf_global_TEMPLATE.yml @@ -34,6 +34,10 @@ coinbase_pro_api_key: null coinbase_pro_secret_key: null coinbase_pro_passphrase: null +coinzoom_api_key: null +coinzoom_secret_key: null +coinzoom_username: null + dydx_eth_private_key: null dydx_node_address: null diff --git a/setup.py b/setup.py index fc70445f2d..08ece91061 100755 --- a/setup.py +++ b/setup.py @@ -56,6 +56,7 @@ def main(): "hummingbot.connector.exchange.bittrex", "hummingbot.connector.exchange.bamboo_relay", "hummingbot.connector.exchange.coinbase_pro", + "hummingbot.connector.exchange.coinzoom", "hummingbot.connector.exchange.dydx", "hummingbot.connector.exchange.huobi", "hummingbot.connector.exchange.radar_relay", diff --git a/test/connector/exchange/coinzoom/.gitignore b/test/connector/exchange/coinzoom/.gitignore new file mode 100644 index 0000000000..23d9952b8c --- /dev/null +++ b/test/connector/exchange/coinzoom/.gitignore @@ -0,0 +1 @@ +backups \ No newline at end of file diff --git a/test/connector/exchange/coinzoom/__init__.py b/test/connector/exchange/coinzoom/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/connector/exchange/coinzoom/test_coinzoom_auth.py b/test/connector/exchange/coinzoom/test_coinzoom_auth.py new file mode 100644 index 0000000000..f2573e4460 --- /dev/null +++ b/test/connector/exchange/coinzoom/test_coinzoom_auth.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +import sys +import asyncio +import unittest +import aiohttp +import conf +import logging +from async_timeout import timeout +from os.path import join, realpath +from typing import Dict, Any +from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth +from hummingbot.connector.exchange.coinzoom.coinzoom_websocket import CoinzoomWebsocket +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL +from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants + +sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +logging.basicConfig(level=METRICS_LOG_LEVEL) + + +class TestAuth(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + api_key = conf.coinzoom_api_key + secret_key = conf.coinzoom_secret_key + api_username = conf.coinzoom_username + cls.auth = CoinzoomAuth(api_key, secret_key, api_username) + + async def rest_auth(self) -> Dict[Any, Any]: + endpoint = Constants.ENDPOINT['USER_BALANCES'] + headers = self.auth.get_headers() + response = await aiohttp.ClientSession().get(f"{Constants.REST_URL}/{endpoint}", headers=headers) + return await response.json() + + async def ws_auth(self) -> Dict[Any, Any]: + ws = CoinzoomWebsocket(self.auth) + await ws.connect() + user_ws_streams = {Constants.WS_SUB["USER_ORDERS_TRADES"]: {}} + async with timeout(30): + await ws.subscribe(user_ws_streams) + async for response in ws.on_message(): + if ws.is_subscribed: + return True + return False + + def test_rest_auth(self): + result = self.ev_loop.run_until_complete(self.rest_auth()) + if len(result) == 0 or "currency" not in result[0].keys(): + print(f"Unexpected response for API call: {result}") + assert "currency" in result[0].keys() + + def test_ws_auth(self): + subscribed = self.ev_loop.run_until_complete(self.ws_auth()) + assert subscribed is True diff --git a/test/connector/exchange/coinzoom/test_coinzoom_exchange.py b/test/connector/exchange/coinzoom/test_coinzoom_exchange.py new file mode 100644 index 0000000000..979b4651b6 --- /dev/null +++ b/test/connector/exchange/coinzoom/test_coinzoom_exchange.py @@ -0,0 +1,442 @@ +from os.path import join, realpath +import sys; sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +import asyncio +import logging +from decimal import Decimal +import unittest +import contextlib +import time +import os +from typing import List +import conf +import math +from async_timeout import timeout + +from hummingbot.core.clock import Clock, ClockMode +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL +from hummingbot.core.utils.async_utils import safe_gather, safe_ensure_future +from hummingbot.core.event.event_logger import EventLogger +from hummingbot.core.event.events import ( + BuyOrderCompletedEvent, + BuyOrderCreatedEvent, + MarketEvent, + OrderFilledEvent, + OrderType, + SellOrderCompletedEvent, + SellOrderCreatedEvent, + OrderCancelledEvent +) +from hummingbot.model.sql_connection_manager import ( + SQLConnectionManager, + SQLConnectionType +) +from hummingbot.model.market_state import MarketState +from hummingbot.model.order import Order +from hummingbot.model.trade_fill import TradeFill +from hummingbot.connector.markets_recorder import MarketsRecorder +from hummingbot.connector.exchange.coinzoom.coinzoom_exchange import CoinzoomExchange + +logging.basicConfig(level=METRICS_LOG_LEVEL) + +API_KEY = conf.coinzoom_api_key +API_SECRET = conf.coinzoom_secret_key +API_USERNAME = conf.coinzoom_username + + +class CoinzoomExchangeUnitTest(unittest.TestCase): + events: List[MarketEvent] = [ + MarketEvent.BuyOrderCompleted, + MarketEvent.SellOrderCompleted, + MarketEvent.OrderFilled, + MarketEvent.TransactionFailure, + MarketEvent.BuyOrderCreated, + MarketEvent.SellOrderCreated, + MarketEvent.OrderCancelled, + MarketEvent.OrderFailure + ] + connector: CoinzoomExchange + event_logger: EventLogger + trading_pair = "BTC-USD" + base_token, quote_token = trading_pair.split("-") + stack: contextlib.ExitStack + + @classmethod + def setUpClass(cls): + global MAINNET_RPC_URL + + cls.ev_loop = asyncio.get_event_loop() + + cls.clock: Clock = Clock(ClockMode.REALTIME) + cls.connector: CoinzoomExchange = CoinzoomExchange( + coinzoom_api_key=API_KEY, + coinzoom_secret_key=API_SECRET, + coinzoom_username=API_USERNAME, + trading_pairs=[cls.trading_pair], + trading_required=True + ) + print("Initializing Coinzoom market... this will take about a minute.") + cls.clock.add_iterator(cls.connector) + cls.stack: contextlib.ExitStack = contextlib.ExitStack() + cls._clock = cls.stack.enter_context(cls.clock) + cls.ev_loop.run_until_complete(cls.wait_til_ready()) + print("Ready.") + + @classmethod + def tearDownClass(cls) -> None: + cls.stack.close() + + @classmethod + async def wait_til_ready(cls, connector = None): + if connector is None: + connector = cls.connector + async with timeout(90): + while True: + now = time.time() + next_iteration = now // 1.0 + 1 + if connector.ready: + break + else: + await cls._clock.run_til(next_iteration) + await asyncio.sleep(1.0) + + def setUp(self): + self.db_path: str = realpath(join(__file__, "../connector_test.sqlite")) + try: + os.unlink(self.db_path) + except FileNotFoundError: + pass + + self.event_logger = EventLogger() + for event_tag in self.events: + self.connector.add_listener(event_tag, self.event_logger) + + def tearDown(self): + for event_tag in self.events: + self.connector.remove_listener(event_tag, self.event_logger) + self.event_logger = None + + async def run_parallel_async(self, *tasks): + future: asyncio.Future = safe_ensure_future(safe_gather(*tasks)) + while not future.done(): + now = time.time() + next_iteration = now // 1.0 + 1 + await self._clock.run_til(next_iteration) + await asyncio.sleep(1.0) + return future.result() + + def run_parallel(self, *tasks): + return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks)) + + def _place_order(self, is_buy, amount, order_type, price, ex_order_id) -> str: + if is_buy: + cl_order_id = self.connector.buy(self.trading_pair, amount, order_type, price) + else: + cl_order_id = self.connector.sell(self.trading_pair, amount, order_type, price) + return cl_order_id + + def _cancel_order(self, cl_order_id, connector=None): + if connector is None: + connector = self.connector + return connector.cancel(self.trading_pair, cl_order_id) + + def test_estimate_fee(self): + maker_fee = self.connector.estimate_fee_pct(True) + self.assertAlmostEqual(maker_fee, Decimal("0.002")) + taker_fee = self.connector.estimate_fee_pct(False) + self.assertAlmostEqual(taker_fee, Decimal("0.0026")) + + def test_buy_and_sell(self): + price = self.connector.get_price(self.trading_pair, True) * Decimal("1.02") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + quote_bal = self.connector.get_available_balance(self.quote_token) + base_bal = self.connector.get_available_balance(self.base_token) + + order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1) + order_completed_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCompletedEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(5)) + trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)] + base_amount_traded = sum(t.amount for t in trade_events) + quote_amount_traded = sum(t.amount * t.price for t in trade_events) + + self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events]) + self.assertEqual(order_id, order_completed_event.order_id) + self.assertEqual(amount, order_completed_event.base_asset_amount) + self.assertEqual("BTC", order_completed_event.base_asset) + self.assertEqual("USD", order_completed_event.quote_asset) + self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount) + self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount) + self.assertGreater(order_completed_event.fee_amount, Decimal(0)) + self.assertTrue(any([isinstance(event, BuyOrderCreatedEvent) and str(event.order_id) == str(order_id) + for event in self.event_logger.event_log])) + + # check available quote balance gets updated, we need to wait a bit for the balance message to arrive + expected_quote_bal = quote_bal - quote_amount_traded + # self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 1) + + # Reset the logs + self.event_logger.clear() + + # Try to sell back the same amount to the exchange, and watch for completion event. + price = self.connector.get_price(self.trading_pair, True) * Decimal("0.98") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2) + order_completed_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCompletedEvent)) + trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)] + base_amount_traded = sum(t.amount for t in trade_events) + quote_amount_traded = sum(t.amount * t.price for t in trade_events) + + self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events]) + self.assertEqual(order_id, order_completed_event.order_id) + self.assertEqual(amount, order_completed_event.base_asset_amount) + self.assertEqual("BTC", order_completed_event.base_asset) + self.assertEqual("USD", order_completed_event.quote_asset) + self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount) + self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount) + self.assertGreater(order_completed_event.fee_amount, Decimal(0)) + self.assertTrue(any([isinstance(event, SellOrderCreatedEvent) and event.order_id == order_id + for event in self.event_logger.event_log])) + + # check available base balance gets updated, we need to wait a bit for the balance message to arrive + expected_base_bal = base_bal + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.ev_loop.run_until_complete(asyncio.sleep(5)) + self.assertAlmostEqual(expected_base_bal, self.connector.get_available_balance(self.base_token), 5) + + def test_limit_makers_unfilled(self): + price = self.connector.get_price(self.trading_pair, True) * Decimal("0.8") + price = self.connector.quantize_order_price(self.trading_pair, price) + price_quantum = self.connector.get_order_price_quantum(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.ev_loop.run_until_complete(asyncio.sleep(2)) + quote_bal = self.connector.get_available_balance(self.quote_token) + + cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1) + order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent)) + self.assertEqual(cl_order_id, order_created_event.order_id) + # check available quote balance gets updated, we need to wait a bit for the balance message to arrive + taker_fee = self.connector.estimate_fee_pct(False) + quote_amount = (math.ceil(((price * amount) * (Decimal("1") + taker_fee)) / price_quantum) * price_quantum) + expected_quote_bal = quote_bal - quote_amount + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.ev_loop.run_until_complete(asyncio.sleep(2)) + + self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 5) + self._cancel_order(cl_order_id) + event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self.assertEqual(cl_order_id, event.order_id) + + price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + + cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2) + order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCreatedEvent)) + self.assertEqual(cl_order_id, order_created_event.order_id) + self._cancel_order(cl_order_id) + event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self.assertEqual(cl_order_id, event.order_id) + + # # @TODO: find a way to create "rejected" + # def test_limit_maker_rejections(self): + # price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2") + # price = self.connector.quantize_order_price(self.trading_pair, price) + # amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000001")) + # cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1) + # event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + # self.assertEqual(cl_order_id, event.order_id) + + # price = self.connector.get_price(self.trading_pair, False) * Decimal("0.8") + # price = self.connector.quantize_order_price(self.trading_pair, price) + # amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000001")) + # cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2) + # event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + # self.assertEqual(cl_order_id, event.order_id) + + def test_cancel_all(self): + bid_price = self.connector.get_price(self.trading_pair, True) + ask_price = self.connector.get_price(self.trading_pair, False) + bid_price = self.connector.quantize_order_price(self.trading_pair, bid_price * Decimal("0.9")) + ask_price = self.connector.quantize_order_price(self.trading_pair, ask_price * Decimal("1.1")) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + + buy_id = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1) + sell_id = self._place_order(False, amount, OrderType.LIMIT, ask_price, 2) + + self.ev_loop.run_until_complete(asyncio.sleep(1)) + asyncio.ensure_future(self.connector.cancel_all(15)) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + cancel_events = [t for t in self.event_logger.event_log if isinstance(t, OrderCancelledEvent)] + self.assertEqual({buy_id, sell_id}, {o.order_id for o in cancel_events}) + + def test_order_quantized_values(self): + bid_price: Decimal = self.connector.get_price(self.trading_pair, True) + ask_price: Decimal = self.connector.get_price(self.trading_pair, False) + mid_price: Decimal = (bid_price + ask_price) / 2 + + # Make sure there's enough balance to make the limit orders. + self.assertGreater(self.connector.get_balance("BTC"), Decimal("0.0005")) + self.assertGreater(self.connector.get_balance("USD"), Decimal("10")) + + # Intentionally set some prices with too many decimal places s.t. they + # need to be quantized. Also, place them far away from the mid-price s.t. they won't + # get filled during the test. + bid_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("0.9333192292111341")) + ask_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("1.1492431474884933")) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000123456")) + + # Test bid order + cl_order_id_1 = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1) + # Wait for the order created event and examine the order made + self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent)) + + # Test ask order + cl_order_id_2 = self._place_order(False, amount, OrderType.LIMIT, ask_price, 1) + # Wait for the order created event and examine and order made + self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCreatedEvent)) + + self._cancel_order(cl_order_id_1) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self._cancel_order(cl_order_id_2) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + + def test_orders_saving_and_restoration(self): + config_path = "test_config" + strategy_name = "test_strategy" + sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path) + order_id = None + recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name) + recorder.start() + + try: + self.connector._in_flight_orders.clear() + self.assertEqual(0, len(self.connector.tracking_states)) + + # Try to put limit buy order for 0.02 ETH worth of ZRX, and watch for order creation event. + current_bid_price: Decimal = self.connector.get_price(self.trading_pair, True) + price: Decimal = current_bid_price * Decimal("0.8") + price = self.connector.quantize_order_price(self.trading_pair, price) + + amount: Decimal = Decimal("0.0001") + amount = self.connector.quantize_order_amount(self.trading_pair, amount) + + cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1) + order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent)) + self.assertEqual(cl_order_id, order_created_event.order_id) + + # Verify tracking states + self.assertEqual(1, len(self.connector.tracking_states)) + self.assertEqual(cl_order_id, list(self.connector.tracking_states.keys())[0]) + + # Verify orders from recorder + recorded_orders: List[Order] = recorder.get_orders_for_config_and_market(config_path, self.connector) + self.assertEqual(1, len(recorded_orders)) + self.assertEqual(cl_order_id, recorded_orders[0].id) + + # Verify saved market states + saved_market_states: MarketState = recorder.get_market_states(config_path, self.connector) + self.assertIsNotNone(saved_market_states) + self.assertIsInstance(saved_market_states.saved_state, dict) + self.assertGreater(len(saved_market_states.saved_state), 0) + + # Close out the current market and start another market. + self.connector.stop(self._clock) + self.ev_loop.run_until_complete(asyncio.sleep(5)) + self.clock.remove_iterator(self.connector) + for event_tag in self.events: + self.connector.remove_listener(event_tag, self.event_logger) + # Clear the event loop + self.event_logger.clear() + new_connector = CoinzoomExchange(API_KEY, API_SECRET, API_USERNAME, [self.trading_pair], True) + for event_tag in self.events: + new_connector.add_listener(event_tag, self.event_logger) + recorder.stop() + recorder = MarketsRecorder(sql, [new_connector], config_path, strategy_name) + recorder.start() + saved_market_states = recorder.get_market_states(config_path, new_connector) + self.clock.add_iterator(new_connector) + self.ev_loop.run_until_complete(self.wait_til_ready(new_connector)) + self.assertEqual(0, len(new_connector.limit_orders)) + self.assertEqual(0, len(new_connector.tracking_states)) + new_connector.restore_tracking_states(saved_market_states.saved_state) + self.assertEqual(1, len(new_connector.limit_orders)) + self.assertEqual(1, len(new_connector.tracking_states)) + + # Cancel the order and verify that the change is saved. + self._cancel_order(cl_order_id, new_connector) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + recorder.save_market_states(config_path, new_connector) + order_id = None + self.assertEqual(0, len(new_connector.limit_orders)) + self.assertEqual(0, len(new_connector.tracking_states)) + saved_market_states = recorder.get_market_states(config_path, new_connector) + self.assertEqual(0, len(saved_market_states.saved_state)) + finally: + if order_id is not None: + self.connector.cancel(self.trading_pair, cl_order_id) + self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent)) + + recorder.stop() + os.unlink(self.db_path) + + def test_update_last_prices(self): + # This is basic test to see if order_book last_trade_price is initiated and updated. + for order_book in self.connector.order_books.values(): + for _ in range(5): + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.assertFalse(math.isnan(order_book.last_trade_price)) + + def test_filled_orders_recorded(self): + config_path: str = "test_config" + strategy_name: str = "test_strategy" + sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path) + order_id = None + recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name) + recorder.start() + + try: + # Try to buy some token from the exchange, and watch for completion event. + price = self.connector.get_price(self.trading_pair, True) * Decimal("1.05") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + + order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1) + self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCompletedEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + + # Reset the logs + self.event_logger.clear() + + # Try to sell back the same amount to the exchange, and watch for completion event. + price = self.connector.get_price(self.trading_pair, True) * Decimal("0.95") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2) + self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCompletedEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + + # Query the persisted trade logs + trade_fills: List[TradeFill] = recorder.get_trades_for_config(config_path) + self.assertGreaterEqual(len(trade_fills), 2) + buy_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "BUY"] + sell_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "SELL"] + self.assertGreaterEqual(len(buy_fills), 1) + self.assertGreaterEqual(len(sell_fills), 1) + + order_id = None + + finally: + if order_id is not None: + self.connector.cancel(self.trading_pair, order_id) + self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent)) + + recorder.stop() + os.unlink(self.db_path) diff --git a/test/connector/exchange/coinzoom/test_coinzoom_order_book_tracker.py b/test/connector/exchange/coinzoom/test_coinzoom_order_book_tracker.py new file mode 100755 index 0000000000..62a1a1b6d9 --- /dev/null +++ b/test/connector/exchange/coinzoom/test_coinzoom_order_book_tracker.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +import sys +import math +import time +import asyncio +import logging +import unittest +from async_timeout import timeout +from os.path import join, realpath +from typing import Dict, Optional, List +from hummingbot.core.event.event_logger import EventLogger +from hummingbot.core.event.events import OrderBookEvent, OrderBookTradeEvent, TradeType +from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_tracker import CoinzoomOrderBookTracker +from hummingbot.connector.exchange.coinzoom.coinzoom_api_order_book_data_source import CoinzoomAPIOrderBookDataSource +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL + + +sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +logging.basicConfig(level=METRICS_LOG_LEVEL) + + +class CoinzoomOrderBookTrackerUnitTest(unittest.TestCase): + order_book_tracker: Optional[CoinzoomOrderBookTracker] = None + events: List[OrderBookEvent] = [ + OrderBookEvent.TradeEvent + ] + trading_pairs: List[str] = [ + "BTC-USD", + "ETH-USD", + ] + + @classmethod + def setUpClass(cls): + cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + cls.order_book_tracker: CoinzoomOrderBookTracker = CoinzoomOrderBookTracker(cls.trading_pairs) + cls.order_book_tracker.start() + cls.ev_loop.run_until_complete(cls.wait_til_tracker_ready()) + + @classmethod + async def wait_til_tracker_ready(cls): + async with timeout(20): + while True: + if len(cls.order_book_tracker.order_books) > 0: + print("Initialized real-time order books.") + return + await asyncio.sleep(1) + + async def run_parallel_async(self, *tasks, timeout=None): + future: asyncio.Future = asyncio.ensure_future(asyncio.gather(*tasks)) + timer = 0 + while not future.done(): + if timeout and timer > timeout: + raise Exception("Timeout running parallel async tasks in tests") + timer += 1 + now = time.time() + _next_iteration = now // 1.0 + 1 # noqa: F841 + await asyncio.sleep(1.0) + return future.result() + + def run_parallel(self, *tasks): + return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks, timeout=60)) + + def setUp(self): + self.event_logger = EventLogger() + for event_tag in self.events: + for trading_pair, order_book in self.order_book_tracker.order_books.items(): + order_book.add_listener(event_tag, self.event_logger) + + def test_order_book_trade_event_emission(self): + """ + Tests if the order book tracker is able to retrieve order book trade message from exchange and emit order book + trade events after correctly parsing the trade messages + """ + self.run_parallel(self.event_logger.wait_for(OrderBookTradeEvent)) + for ob_trade_event in self.event_logger.event_log: + self.assertTrue(type(ob_trade_event) == OrderBookTradeEvent) + self.assertTrue(ob_trade_event.trading_pair in self.trading_pairs) + self.assertTrue(type(ob_trade_event.timestamp) in [float, int]) + self.assertTrue(type(ob_trade_event.amount) == float) + self.assertTrue(type(ob_trade_event.price) == float) + self.assertTrue(type(ob_trade_event.type) == TradeType) + # datetime is in milliseconds + self.assertTrue(math.ceil(math.log10(ob_trade_event.timestamp)) == 13) + self.assertTrue(ob_trade_event.amount > 0) + self.assertTrue(ob_trade_event.price > 0) + + def test_tracker_integrity(self): + # Wait 5 seconds to process some diffs. + self.ev_loop.run_until_complete(asyncio.sleep(5.0)) + order_books: Dict[str, OrderBook] = self.order_book_tracker.order_books + eth_usd: OrderBook = order_books["ETH-USD"] + self.assertIsNot(eth_usd.last_diff_uid, 0) + self.assertGreaterEqual(eth_usd.get_price_for_volume(True, 10).result_price, + eth_usd.get_price(True)) + self.assertLessEqual(eth_usd.get_price_for_volume(False, 10).result_price, + eth_usd.get_price(False)) + + def test_api_get_last_traded_prices(self): + prices = self.ev_loop.run_until_complete( + CoinzoomAPIOrderBookDataSource.get_last_traded_prices(["BTC-USD", "LTC-BTC"])) + for key, value in prices.items(): + print(f"{key} last_trade_price: {value}") + self.assertGreater(prices["BTC-USD"], 1000) + self.assertLess(prices["LTC-BTC"], 1) diff --git a/test/connector/exchange/coinzoom/test_coinzoom_user_stream_tracker.py b/test/connector/exchange/coinzoom/test_coinzoom_user_stream_tracker.py new file mode 100644 index 0000000000..f9f85f335d --- /dev/null +++ b/test/connector/exchange/coinzoom/test_coinzoom_user_stream_tracker.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +import sys +import asyncio +import logging +import unittest +import conf + +from os.path import join, realpath +from hummingbot.connector.exchange.coinzoom.coinzoom_user_stream_tracker import CoinzoomUserStreamTracker +from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth +from hummingbot.core.utils.async_utils import safe_ensure_future +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL + + +sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +logging.basicConfig(level=METRICS_LOG_LEVEL) + + +class CoinzoomUserStreamTrackerUnitTest(unittest.TestCase): + api_key = conf.coinzoom_api_key + api_secret = conf.coinzoom_secret_key + api_username = conf.coinzoom_username + + @classmethod + def setUpClass(cls): + cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + cls.trading_pairs = ["BTC-USD"] + cls.user_stream_tracker: CoinzoomUserStreamTracker = CoinzoomUserStreamTracker( + coinzoom_auth=CoinzoomAuth(cls.api_key, cls.api_secret, cls.api_username), + trading_pairs=cls.trading_pairs) + cls.user_stream_tracker_task: asyncio.Task = safe_ensure_future(cls.user_stream_tracker.start()) + + def test_user_stream(self): + # Wait process some msgs. + print("Sleeping for 30s to gather some user stream messages.") + self.ev_loop.run_until_complete(asyncio.sleep(30.0)) + print(self.user_stream_tracker.user_stream)