diff --git a/README.md b/README.md index 10417f0950..eb9e13553d 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ We created hummingbot to promote **decentralized market-making**: enabling membe | Crypto.com | crypto_com | [Crypto.com](https://crypto.com/exchange) | 2 | [API](https://exchange-docs.crypto.com/#introduction) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) | | DyDx | dydx | [dy/dx](https://dydx.exchange/) | 1 | [API](https://docs.dydx.exchange/) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | | Eterbase | eterbase | [Eterbase](https://www.eterbase.com/) | * | [API](https://developers.eterbase.exchange/?version=latest) |![RED](https://via.placeholder.com/15/f03c15/?text=+) | +| HitBTC | hitbtc | [HitBTC](https://hitbtc.com/) | 2 | [API](https://api.hitbtc.com/) | ![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) | |Huobi Global| huobi | [Huobi Global](https://www.hbg.com) | 1 | [API](https://huobiapi.github.io/docs/spot/v1/en/) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | | KuCoin | kucoin | [KuCoin](https://www.kucoin.com/) | 1 | [API](https://docs.kucoin.com/#general) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | | Kraken | kraken | [Kraken](https://www.kraken.com/) | 1 | [API](https://www.kraken.com/features/api) |![GREEN](https://via.placeholder.com/15/008000/?text=+) | diff --git a/assets/hitbtc_logo.png b/assets/hitbtc_logo.png new file mode 100644 index 0000000000..efdac10517 Binary files /dev/null and b/assets/hitbtc_logo.png differ diff --git a/conf/__init__.py b/conf/__init__.py index 65ba5a8347..7854c51a99 100644 --- a/conf/__init__.py +++ b/conf/__init__.py @@ -104,6 +104,10 @@ crypto_com_api_key = os.getenv("CRYPTO_COM_API_KEY") crypto_com_secret_key = os.getenv("CRYPTO_COM_SECRET_KEY") +# HitBTC Tests +hitbtc_api_key = os.getenv("HITBTC_API_KEY") +hitbtc_secret_key = os.getenv("HITBTC_SECRET_KEY") + # Wallet Tests test_erc20_token_address = os.getenv("TEST_ERC20_TOKEN_ADDRESS") web3_test_private_key_a = os.getenv("TEST_WALLET_PRIVATE_KEY_A") diff --git a/hummingbot/connector/connector_status.py b/hummingbot/connector/connector_status.py index f220808b1e..2f35999bb7 100644 --- a/hummingbot/connector/connector_status.py +++ b/hummingbot/connector/connector_status.py @@ -17,6 +17,7 @@ 'dydx': 'green', 'eterbase': 'red', 'ethereum': 'red', + 'hitbtc': 'yellow', 'huobi': 'green', 'kraken': 'green', 'kucoin': 'green', diff --git a/hummingbot/connector/exchange/hitbtc/__init__.py b/hummingbot/connector/exchange/hitbtc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_active_order_tracker.pxd b/hummingbot/connector/exchange/hitbtc/hitbtc_active_order_tracker.pxd new file mode 100644 index 0000000000..5babac5332 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_active_order_tracker.pxd @@ -0,0 +1,11 @@ +# distutils: language=c++ +cimport numpy as np + +cdef class HitbtcActiveOrderTracker: + cdef dict _active_bids + cdef dict _active_asks + + cdef tuple c_convert_diff_message_to_np_arrays(self, object message) + cdef tuple c_convert_snapshot_message_to_np_arrays(self, object message) + # This method doesn't seem to be used anywhere at all + # cdef np.ndarray[np.float64_t, ndim=1] c_convert_trade_message_to_np_array(self, object message) diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_active_order_tracker.pyx b/hummingbot/connector/exchange/hitbtc/hitbtc_active_order_tracker.pyx new file mode 100644 index 0000000000..5e248bb3d5 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_active_order_tracker.pyx @@ -0,0 +1,155 @@ +# distutils: language=c++ +# distutils: sources=hummingbot/core/cpp/OrderBookEntry.cpp +import logging +import numpy as np +from decimal import Decimal +from typing import Dict +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.order_book_row import OrderBookRow + +_logger = None +s_empty_diff = np.ndarray(shape=(0, 4), dtype="float64") +HitbtcOrderBookTrackingDictionary = Dict[Decimal, Dict[str, Dict[str, any]]] + +cdef class HitbtcActiveOrderTracker: + def __init__(self, + active_asks: HitbtcOrderBookTrackingDictionary = None, + active_bids: HitbtcOrderBookTrackingDictionary = None): + super().__init__() + self._active_asks = active_asks or {} + self._active_bids = active_bids or {} + + @classmethod + def logger(cls) -> HummingbotLogger: + global _logger + if _logger is None: + _logger = logging.getLogger(__name__) + return _logger + + @property + def active_asks(self) -> HitbtcOrderBookTrackingDictionary: + return self._active_asks + + @property + def active_bids(self) -> HitbtcOrderBookTrackingDictionary: + return self._active_bids + + # TODO: research this more + def volume_for_ask_price(self, price) -> float: + return NotImplementedError + + # TODO: research this more + def volume_for_bid_price(self, price) -> float: + return NotImplementedError + + def get_rates_and_quantities(self, entry) -> tuple: + # price, quantity + return float(entry["price"]), float(entry["size"]) + + cdef tuple c_convert_diff_message_to_np_arrays(self, object message): + cdef: + dict content = message.content + list content_keys = list(content.keys()) + list bid_entries = [] + list ask_entries = [] + str order_id + str order_side + str price_raw + object price + dict order_dict + double timestamp = message.timestamp + double amount = 0 + + if "bid" in content_keys: + bid_entries = content["bid"] + if "ask" in content_keys: + ask_entries = content["ask"] + + bids = s_empty_diff + asks = s_empty_diff + + if len(bid_entries) > 0: + bids = np.array( + [[timestamp, + price, + amount, + message.update_id] + for price, amount in [self.get_rates_and_quantities(entry) for entry in bid_entries]], + dtype="float64", + ndmin=2 + ) + + if len(ask_entries) > 0: + asks = np.array( + [[timestamp, + price, + amount, + message.update_id] + for price, amount in [self.get_rates_and_quantities(entry) for entry in ask_entries]], + dtype="float64", + ndmin=2 + ) + + return bids, asks + + cdef tuple c_convert_snapshot_message_to_np_arrays(self, object message): + cdef: + float price + float amount + str order_id + dict order_dict + + # Refresh all order tracking. + self._active_bids.clear() + self._active_asks.clear() + timestamp = message.timestamp + content = message.content + + for snapshot_orders, active_orders in [(content["bid"], self._active_bids), (content["ask"], self._active_asks)]: + for entry in snapshot_orders: + price, amount = self.get_rates_and_quantities(entry) + active_orders[price] = amount + + # Return the sorted snapshot tables. + cdef: + np.ndarray[np.float64_t, ndim=2] bids = np.array( + [[message.timestamp, + float(price), + float(self._active_bids[price]), + message.update_id] + for price in sorted(self._active_bids.keys())], dtype='float64', ndmin=2) + np.ndarray[np.float64_t, ndim=2] asks = np.array( + [[message.timestamp, + float(price), + float(self._active_asks[price]), + message.update_id] + for price in sorted(self._active_asks.keys(), reverse=True)], dtype='float64', ndmin=2) + + if bids.shape[1] != 4: + bids = bids.reshape((0, 4)) + if asks.shape[1] != 4: + asks = asks.reshape((0, 4)) + + return bids, asks + + # This method doesn't seem to be used anywhere at all + # cdef np.ndarray[np.float64_t, ndim=1] c_convert_trade_message_to_np_array(self, object message): + # cdef: + # double trade_type_value = 1.0 if message.content["side"] == "buy" else 2.0 + # list content = message.content + # return np.array( + # [message.timestamp, trade_type_value, float(content["price"]), float(content["quantity"])], + # dtype="float64" + # ) + + def convert_diff_message_to_order_book_row(self, message): + np_bids, np_asks = self.c_convert_diff_message_to_np_arrays(message) + bids_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_bids] + asks_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_asks] + return bids_row, asks_row + + def convert_snapshot_message_to_order_book_row(self, message): + np_bids, np_asks = self.c_convert_snapshot_message_to_np_arrays(message) + bids_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_bids] + asks_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_asks] + return bids_row, asks_row diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_api_order_book_data_source.py b/hummingbot/connector/exchange/hitbtc/hitbtc_api_order_book_data_source.py new file mode 100644 index 0000000000..40d83516da --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_api_order_book_data_source.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python +import asyncio +import logging +import time +import pandas as pd +from decimal import Decimal +from typing import Optional, List, Dict, Any +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import OrderBookMessage +from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource +from hummingbot.logger import HummingbotLogger +from .hitbtc_constants import Constants +from .hitbtc_active_order_tracker import HitbtcActiveOrderTracker +from .hitbtc_order_book import HitbtcOrderBook +from .hitbtc_websocket import HitbtcWebsocket +from .hitbtc_utils import ( + str_date_to_ts, + convert_to_exchange_trading_pair, + convert_from_exchange_trading_pair, + api_call_with_retries, + HitbtcAPIError, +) + + +class HitbtcAPIOrderBookDataSource(OrderBookTrackerDataSource): + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, trading_pairs: List[str] = None): + super().__init__(trading_pairs) + self._trading_pairs: List[str] = trading_pairs + self._snapshot_msg: Dict[str, any] = {} + + @classmethod + async def get_last_traded_prices(cls, trading_pairs: List[str]) -> Dict[str, Decimal]: + results = {} + if len(trading_pairs) > 1: + tickers: List[Dict[Any]] = await api_call_with_retries("GET", Constants.ENDPOINT["TICKER"]) + for trading_pair in trading_pairs: + ex_pair: str = convert_to_exchange_trading_pair(trading_pair) + if len(trading_pairs) > 1: + ticker: Dict[Any] = list([tic for tic in tickers if tic['symbol'] == ex_pair])[0] + else: + url_endpoint = Constants.ENDPOINT["TICKER_SINGLE"].format(trading_pair=ex_pair) + ticker: Dict[Any] = await api_call_with_retries("GET", url_endpoint) + results[trading_pair]: Decimal = Decimal(str(ticker["last"])) + return results + + @staticmethod + async def fetch_trading_pairs() -> List[str]: + try: + symbols: List[Dict[str, Any]] = await api_call_with_retries("GET", Constants.ENDPOINT["SYMBOL"]) + trading_pairs: List[str] = list([convert_from_exchange_trading_pair(sym["id"]) for sym in symbols]) + # Filter out unmatched pairs so nothing breaks + return [sym for sym in trading_pairs if sym is not None] + except Exception: + # Do nothing if the request fails -- there will be no autocomplete for HitBTC trading pairs + pass + return [] + + @staticmethod + async def get_order_book_data(trading_pair: str) -> Dict[str, any]: + """ + Get whole orderbook + """ + try: + ex_pair = convert_to_exchange_trading_pair(trading_pair) + orderbook_response: Dict[Any] = await api_call_with_retries("GET", Constants.ENDPOINT["ORDER_BOOK"], + params={"limit": 150, "symbols": ex_pair}) + return orderbook_response[ex_pair] + except HitbtcAPIError as e: + err = e.error_payload.get('error', e.error_payload) + raise IOError( + f"Error fetching OrderBook for {trading_pair} at {Constants.EXCHANGE_NAME}. " + f"HTTP status is {e.error_payload['status']}. Error is {err.get('message', str(err))}.") + + async def get_new_order_book(self, trading_pair: str) -> OrderBook: + snapshot: Dict[str, Any] = await self.get_order_book_data(trading_pair) + snapshot_timestamp: float = time.time() + snapshot_msg: OrderBookMessage = HitbtcOrderBook.snapshot_message_from_exchange( + snapshot, + snapshot_timestamp, + metadata={"trading_pair": trading_pair}) + order_book = self.order_book_create_function() + active_order_tracker: HitbtcActiveOrderTracker = HitbtcActiveOrderTracker() + bids, asks = active_order_tracker.convert_snapshot_message_to_order_book_row(snapshot_msg) + order_book.apply_snapshot(bids, asks, snapshot_msg.update_id) + return order_book + + async def listen_for_trades(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): + """ + Listen for trades using websocket trade channel + """ + while True: + try: + ws = HitbtcWebsocket() + await ws.connect() + + for pair in self._trading_pairs: + await ws.subscribe(Constants.WS_SUB["TRADES"], convert_to_exchange_trading_pair(pair)) + + async for response in ws.on_message(): + method: str = response.get("method", None) + trades_data: str = response.get("params", None) + + if trades_data is None or method != Constants.WS_METHODS['TRADES_UPDATE']: + continue + + pair: str = convert_from_exchange_trading_pair(response["params"]["symbol"]) + + for trade in trades_data["data"]: + trade: Dict[Any] = trade + trade_timestamp: int = str_date_to_ts(trade["timestamp"]) + trade_msg: OrderBookMessage = HitbtcOrderBook.trade_message_from_exchange( + trade, + trade_timestamp, + metadata={"trading_pair": pair}) + output.put_nowait(trade_msg) + + except asyncio.CancelledError: + raise + except Exception: + self.logger().error("Unexpected error.", exc_info=True) + await asyncio.sleep(5.0) + finally: + await ws.disconnect() + + async def listen_for_order_book_diffs(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): + """ + Listen for orderbook diffs using websocket book channel + """ + while True: + try: + ws = HitbtcWebsocket() + await ws.connect() + + order_book_methods = [ + Constants.WS_METHODS['ORDERS_SNAPSHOT'], + Constants.WS_METHODS['ORDERS_UPDATE'], + ] + + for pair in self._trading_pairs: + await ws.subscribe(Constants.WS_SUB["ORDERS"], convert_to_exchange_trading_pair(pair)) + + async for response in ws.on_message(): + method: str = response.get("method", None) + order_book_data: str = response.get("params", None) + + if order_book_data is None or method not in order_book_methods: + continue + + timestamp: int = str_date_to_ts(order_book_data["timestamp"]) + pair: str = convert_from_exchange_trading_pair(order_book_data["symbol"]) + + order_book_msg_cls = (HitbtcOrderBook.diff_message_from_exchange + if method == Constants.WS_METHODS['ORDERS_UPDATE'] else + HitbtcOrderBook.snapshot_message_from_exchange) + + orderbook_msg: OrderBookMessage = order_book_msg_cls( + order_book_data, + timestamp, + metadata={"trading_pair": pair}) + output.put_nowait(orderbook_msg) + + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unexpected error with WebSocket connection.", exc_info=True, + app_warning_msg="Unexpected error with WebSocket connection. Retrying in 30 seconds. " + "Check network connection.") + await asyncio.sleep(30.0) + finally: + await ws.disconnect() + + async def listen_for_order_book_snapshots(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): + """ + Listen for orderbook snapshots by fetching orderbook + """ + while True: + try: + for trading_pair in self._trading_pairs: + try: + snapshot: Dict[str, any] = await self.get_order_book_data(trading_pair) + snapshot_timestamp: int = str_date_to_ts(snapshot["timestamp"]) + snapshot_msg: OrderBookMessage = HitbtcOrderBook.snapshot_message_from_exchange( + snapshot, + snapshot_timestamp, + metadata={"trading_pair": trading_pair} + ) + output.put_nowait(snapshot_msg) + self.logger().debug(f"Saved order book snapshot for {trading_pair}") + # Be careful not to go above API rate limits. + await asyncio.sleep(5.0) + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unexpected error with WebSocket connection.", exc_info=True, + app_warning_msg="Unexpected error with WebSocket connection. Retrying in 5 seconds. " + "Check network connection.") + await asyncio.sleep(5.0) + this_hour: pd.Timestamp = pd.Timestamp.utcnow().replace(minute=0, second=0, microsecond=0) + next_hour: pd.Timestamp = this_hour + pd.Timedelta(hours=1) + delta: float = next_hour.timestamp() - time.time() + await asyncio.sleep(delta) + except asyncio.CancelledError: + raise + except Exception: + self.logger().error("Unexpected error.", exc_info=True) + await asyncio.sleep(5.0) diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_api_user_stream_data_source.py b/hummingbot/connector/exchange/hitbtc/hitbtc_api_user_stream_data_source.py new file mode 100755 index 0000000000..954ab9c344 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_api_user_stream_data_source.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python +import time +import asyncio +import logging +from typing import ( + Any, + AsyncIterable, + List, + Optional, +) +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.logger import HummingbotLogger +from .hitbtc_constants import Constants +from .hitbtc_auth import HitbtcAuth +from .hitbtc_utils import HitbtcAPIError +from .hitbtc_websocket import HitbtcWebsocket + + +class HitbtcAPIUserStreamDataSource(UserStreamTrackerDataSource): + + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, hitbtc_auth: HitbtcAuth, trading_pairs: Optional[List[str]] = []): + self._hitbtc_auth: HitbtcAuth = hitbtc_auth + self._ws: HitbtcWebsocket = None + self._trading_pairs = trading_pairs + self._current_listen_key = None + self._listen_for_user_stream_task = None + self._last_recv_time: float = 0 + super().__init__() + + @property + def last_recv_time(self) -> float: + return self._last_recv_time + + async def _ws_request_balances(self): + return await self._ws.request(Constants.WS_METHODS["USER_BALANCE"]) + + async def _listen_to_orders_trades_balances(self) -> AsyncIterable[Any]: + """ + Subscribe to active orders via web socket + """ + + try: + self._ws = HitbtcWebsocket(self._hitbtc_auth) + + await self._ws.connect() + + await self._ws.subscribe(Constants.WS_SUB["USER_ORDERS_TRADES"], None, {}) + + event_methods = [ + Constants.WS_METHODS["USER_ORDERS"], + Constants.WS_METHODS["USER_TRADES"], + ] + + async for msg in self._ws.on_message(): + self._last_recv_time = time.time() + + if msg.get("params", msg.get("result", None)) is None: + continue + elif msg.get("method", None) in event_methods: + await self._ws_request_balances() + yield msg + except Exception as e: + raise e + finally: + await self._ws.disconnect() + await asyncio.sleep(5) + + async def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue) -> AsyncIterable[Any]: + """ + *required + Subscribe to user stream via web socket, and keep the connection open for incoming messages + :param ev_loop: ev_loop to execute this function in + :param output: an async queue where the incoming messages are stored + """ + + while True: + try: + async for msg in self._listen_to_orders_trades_balances(): + output.put_nowait(msg) + except asyncio.CancelledError: + raise + except HitbtcAPIError as e: + self.logger().error(e.error_payload.get('error'), exc_info=True) + raise + except Exception: + self.logger().error( + f"Unexpected error with {Constants.EXCHANGE_NAME} WebSocket connection. " + "Retrying after 30 seconds...", exc_info=True) + await asyncio.sleep(30.0) diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_auth.py b/hummingbot/connector/exchange/hitbtc/hitbtc_auth.py new file mode 100755 index 0000000000..be37f2e149 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_auth.py @@ -0,0 +1,72 @@ +import hmac +import hashlib +import time +from base64 import b64encode +from typing import Dict, Any + + +class HitbtcAuth(): + """ + Auth class required by HitBTC API + Learn more at https://exchange-docs.crypto.com/#digital-signature + """ + def __init__(self, api_key: str, secret_key: str): + self.api_key = api_key + self.secret_key = secret_key + + def generate_payload( + self, + method: str, + url: str, + params: Dict[str, Any] = None, + ): + """ + Generates authentication payload and returns it. + :return: A base64 encoded payload for the authentication header. + """ + # Nonce is standard EPOCH timestamp only accurate to 1s + nonce = str(int(time.time())) + body = "" + # Need to build the full URL with query string for HS256 sig + if params is not None and len(params) > 0: + query_string = "&".join([f"{k}={v}" for k, v in params.items()]) + if method == "GET": + url = f"{url}?{query_string}" + else: + body = query_string + # Concat payload + payload = f"{method}{nonce}{url}{body}" + # Create HS256 sig + sig = hmac.new(self.secret_key.encode(), payload.encode(), hashlib.sha256).hexdigest() + # Base64 encode it with public key and nonce + return b64encode(f"{self.api_key}:{nonce}:{sig}".encode()).decode().strip() + + def generate_auth_dict_ws(self, + nonce: int): + """ + Generates an authentication params for HitBTC websockets login + :return: a dictionary of auth params + """ + return { + "algo": "HS256", + "pKey": str(self.api_key), + "nonce": str(nonce), + "signature": hmac.new(self.secret_key.encode('utf-8'), + str(nonce).encode('utf-8'), + hashlib.sha256).hexdigest() + } + + def get_headers(self, + method, + url, + params) -> Dict[str, Any]: + """ + Generates authentication headers required by HitBTC + :return: a dictionary of auth headers + """ + payload = self.generate_payload(method, url, params) + headers = { + "Authorization": f"HS256 {payload}", + "Content-Type": "application/x-www-form-urlencoded", + } + return headers diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_constants.py b/hummingbot/connector/exchange/hitbtc/hitbtc_constants.py new file mode 100644 index 0000000000..538e0b21f2 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_constants.py @@ -0,0 +1,57 @@ +# A single source of truth for constant variables related to the exchange +class Constants: + EXCHANGE_NAME = "hitbtc" + REST_URL = "https://api.hitbtc.com/api/2" + REST_URL_AUTH = "/api/2" + WS_PRIVATE_URL = "wss://api.hitbtc.com/api/2/ws/trading" + WS_PUBLIC_URL = "wss://api.hitbtc.com/api/2/ws/public" + + HBOT_BROKER_ID = "refzzz48" + + ENDPOINT = { + # Public Endpoints + "TICKER": "public/ticker", + "TICKER_SINGLE": "public/ticker/{trading_pair}", + "SYMBOL": "public/symbol", + "ORDER_BOOK": "public/orderbook", + "ORDER_CREATE": "order", + "ORDER_DELETE": "order/{id}", + "ORDER_STATUS": "order/{id}", + "USER_ORDERS": "order", + "USER_BALANCES": "trading/balance", + } + + WS_SUB = { + "TRADES": "Trades", + "ORDERS": "Orderbook", + "USER_ORDERS_TRADES": "Reports", + + } + + WS_METHODS = { + "ORDERS_SNAPSHOT": "snapshotOrderbook", + "ORDERS_UPDATE": "updateOrderbook", + "TRADES_SNAPSHOT": "snapshotTrades", + "TRADES_UPDATE": "updateTrades", + "USER_BALANCE": "getTradingBalance", + "USER_ORDERS": "activeOrders", + "USER_TRADES": "report", + } + + # Timeouts + MESSAGE_TIMEOUT = 30.0 + PING_TIMEOUT = 10.0 + API_CALL_TIMEOUT = 10.0 + API_MAX_RETRIES = 4 + + # Intervals + # Only used when nothing is received from WS + SHORT_POLL_INTERVAL = 5.0 + # One minute should be fine since we get trades, orders and balances via WS + LONG_POLL_INTERVAL = 60.0 + UPDATE_ORDER_STATUS_INTERVAL = 60.0 + # 10 minute interval to update trading rules, these would likely never change whilst running. + INTERVAL_TRADING_RULES = 600 + + # Trading pair splitter regex + TRADING_PAIR_SPLITTER = r"^(\w+)(BTC|BCH|DAI|DDRST|EOSDT|EOS|ETH|EURS|IDRT|PAX|BUSD|GUSD|TUSD|USDC|USDT|USD)$" diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_exchange.py b/hummingbot/connector/exchange/hitbtc/hitbtc_exchange.py new file mode 100644 index 0000000000..9f6f83ec15 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_exchange.py @@ -0,0 +1,877 @@ +import logging +from typing import ( + Dict, + List, + Optional, + Any, + AsyncIterable, +) +from decimal import Decimal +import asyncio +import aiohttp +import math +import time +from async_timeout import timeout + +from hummingbot.core.network_iterator import NetworkStatus +from hummingbot.logger import HummingbotLogger +from hummingbot.core.clock import Clock +from hummingbot.core.utils.async_utils import safe_ensure_future, safe_gather +from hummingbot.connector.trading_rule import TradingRule +from hummingbot.core.data_type.cancellation_result import CancellationResult +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.limit_order import LimitOrder +from hummingbot.core.event.events import ( + MarketEvent, + BuyOrderCompletedEvent, + SellOrderCompletedEvent, + OrderFilledEvent, + OrderCancelledEvent, + BuyOrderCreatedEvent, + SellOrderCreatedEvent, + MarketOrderFailureEvent, + OrderType, + TradeType, + TradeFee +) +from hummingbot.connector.exchange_base import ExchangeBase +from hummingbot.connector.exchange.hitbtc.hitbtc_order_book_tracker import HitbtcOrderBookTracker +from hummingbot.connector.exchange.hitbtc.hitbtc_user_stream_tracker import HitbtcUserStreamTracker +from hummingbot.connector.exchange.hitbtc.hitbtc_auth import HitbtcAuth +from hummingbot.connector.exchange.hitbtc.hitbtc_in_flight_order import HitbtcInFlightOrder +from hummingbot.connector.exchange.hitbtc.hitbtc_utils import ( + convert_from_exchange_trading_pair, + convert_to_exchange_trading_pair, + get_new_client_order_id, + aiohttp_response_with_errors, + retry_sleep_time, + str_date_to_ts, + HitbtcAPIError, +) +from hummingbot.connector.exchange.hitbtc.hitbtc_constants import Constants +from hummingbot.core.data_type.common import OpenOrder +ctce_logger = None +s_decimal_NaN = Decimal("nan") + + +class HitbtcExchange(ExchangeBase): + """ + HitbtcExchange connects with HitBTC exchange and provides order book pricing, user account tracking and + trading functionality. + """ + ORDER_NOT_EXIST_CONFIRMATION_COUNT = 3 + ORDER_NOT_EXIST_CANCEL_COUNT = 2 + + @classmethod + def logger(cls) -> HummingbotLogger: + global ctce_logger + if ctce_logger is None: + ctce_logger = logging.getLogger(__name__) + return ctce_logger + + def __init__(self, + hitbtc_api_key: str, + hitbtc_secret_key: str, + trading_pairs: Optional[List[str]] = None, + trading_required: bool = True + ): + """ + :param hitbtc_api_key: The API key to connect to private HitBTC APIs. + :param hitbtc_secret_key: The API secret. + :param trading_pairs: The market trading pairs which to track order book data. + :param trading_required: Whether actual trading is needed. + """ + super().__init__() + self._trading_required = trading_required + self._trading_pairs = trading_pairs + self._hitbtc_auth = HitbtcAuth(hitbtc_api_key, hitbtc_secret_key) + self._order_book_tracker = HitbtcOrderBookTracker(trading_pairs=trading_pairs) + self._user_stream_tracker = HitbtcUserStreamTracker(self._hitbtc_auth, trading_pairs) + self._ev_loop = asyncio.get_event_loop() + self._shared_client = None + self._poll_notifier = asyncio.Event() + self._last_timestamp = 0 + self._in_flight_orders = {} # Dict[client_order_id:str, HitbtcInFlightOrder] + self._order_not_found_records = {} # Dict[client_order_id:str, count:int] + self._trading_rules = {} # Dict[trading_pair:str, TradingRule] + self._status_polling_task = None + self._user_stream_event_listener_task = None + self._trading_rules_polling_task = None + self._last_poll_timestamp = 0 + + @property + def name(self) -> str: + return "hitbtc" + + @property + def order_books(self) -> Dict[str, OrderBook]: + return self._order_book_tracker.order_books + + @property + def trading_rules(self) -> Dict[str, TradingRule]: + return self._trading_rules + + @property + def in_flight_orders(self) -> Dict[str, HitbtcInFlightOrder]: + return self._in_flight_orders + + @property + def status_dict(self) -> Dict[str, bool]: + """ + A dictionary of statuses of various connector's components. + """ + return { + "order_books_initialized": self._order_book_tracker.ready, + "account_balance": len(self._account_balances) > 0 if self._trading_required else True, + "trading_rule_initialized": len(self._trading_rules) > 0, + "user_stream_initialized": + self._user_stream_tracker.data_source.last_recv_time > 0 if self._trading_required else True, + } + + @property + def ready(self) -> bool: + """ + :return True when all statuses pass, this might take 5-10 seconds for all the connector's components and + services to be ready. + """ + return all(self.status_dict.values()) + + @property + def limit_orders(self) -> List[LimitOrder]: + return [ + in_flight_order.to_limit_order() + for in_flight_order in self._in_flight_orders.values() + ] + + @property + def tracking_states(self) -> Dict[str, any]: + """ + :return active in-flight orders in json format, is used to save in sqlite db. + """ + return { + key: value.to_json() + for key, value in self._in_flight_orders.items() + if not value.is_done + } + + def restore_tracking_states(self, saved_states: Dict[str, any]): + """ + Restore in-flight orders from saved tracking states, this is st the connector can pick up on where it left off + when it disconnects. + :param saved_states: The saved tracking_states. + """ + self._in_flight_orders.update({ + key: HitbtcInFlightOrder.from_json(value) + for key, value in saved_states.items() + }) + + def supported_order_types(self) -> List[OrderType]: + """ + :return a list of OrderType supported by this connector. + Note that Market order type is no longer required and will not be used. + """ + return [OrderType.LIMIT, OrderType.LIMIT_MAKER] + + def start(self, clock: Clock, timestamp: float): + """ + This function is called automatically by the clock. + """ + super().start(clock, timestamp) + + def stop(self, clock: Clock): + """ + This function is called automatically by the clock. + """ + super().stop(clock) + + async def start_network(self): + """ + This function is required by NetworkIterator base class and is called automatically. + It starts tracking order book, polling trading rules, + updating statuses and tracking user data. + """ + self._order_book_tracker.start() + self._trading_rules_polling_task = safe_ensure_future(self._trading_rules_polling_loop()) + if self._trading_required: + self._status_polling_task = safe_ensure_future(self._status_polling_loop()) + self._user_stream_tracker_task = safe_ensure_future(self._user_stream_tracker.start()) + self._user_stream_event_listener_task = safe_ensure_future(self._user_stream_event_listener()) + + async def stop_network(self): + """ + This function is required by NetworkIterator base class and is called automatically. + """ + self._order_book_tracker.stop() + if self._status_polling_task is not None: + self._status_polling_task.cancel() + self._status_polling_task = None + if self._trading_rules_polling_task is not None: + self._trading_rules_polling_task.cancel() + self._trading_rules_polling_task = None + if self._status_polling_task is not None: + self._status_polling_task.cancel() + self._status_polling_task = None + if self._user_stream_tracker_task is not None: + self._user_stream_tracker_task.cancel() + self._user_stream_tracker_task = None + if self._user_stream_event_listener_task is not None: + self._user_stream_event_listener_task.cancel() + self._user_stream_event_listener_task = None + + async def check_network(self) -> NetworkStatus: + """ + This function is required by NetworkIterator base class and is called periodically to check + the network connection. Simply ping the network (or call any light weight public API). + """ + try: + # since there is no ping endpoint, the lowest rate call is to get BTC-USD symbol + await self._api_request("GET", + Constants.ENDPOINT['SYMBOL'], + params={'symbols': 'BTCUSD'}) + except asyncio.CancelledError: + raise + except Exception: + return NetworkStatus.NOT_CONNECTED + return NetworkStatus.CONNECTED + + async def _http_client(self) -> aiohttp.ClientSession: + """ + :returns Shared client session instance + """ + if self._shared_client is None: + self._shared_client = aiohttp.ClientSession() + return self._shared_client + + async def _trading_rules_polling_loop(self): + """ + Periodically update trading rule. + """ + while True: + try: + await self._update_trading_rules() + await asyncio.sleep(Constants.INTERVAL_TRADING_RULES) + except asyncio.CancelledError: + raise + except Exception as e: + self.logger().network(f"Unexpected error while fetching trading rules. Error: {str(e)}", + exc_info=True, + app_warning_msg=("Could not fetch new trading rules from " + f"{Constants.EXCHANGE_NAME}. Check network connection.")) + await asyncio.sleep(0.5) + + async def _update_trading_rules(self): + symbols_info = await self._api_request("GET", endpoint=Constants.ENDPOINT['SYMBOL']) + self._trading_rules.clear() + self._trading_rules = self._format_trading_rules(symbols_info) + + def _format_trading_rules(self, symbols_info: Dict[str, Any]) -> Dict[str, TradingRule]: + """ + Converts json API response into a dictionary of trading rules. + :param symbols_info: The json API response + :return A dictionary of trading rules. + Response Example: + [ + { + id: "BTCUSD", + baseCurrency: "BTC", + quoteCurrency: "USD", + quantityIncrement: "0.00001", + tickSize: "0.01", + takeLiquidityRate: "0.0025", + provideLiquidityRate: "0.001", + feeCurrency: "USD", + marginTrading: true, + maxInitialLeverage: "12.00" + } + ] + """ + result = {} + for rule in symbols_info: + try: + trading_pair = convert_from_exchange_trading_pair(rule["id"]) + price_step = Decimal(str(rule["tickSize"])) + size_step = Decimal(str(rule["quantityIncrement"])) + result[trading_pair] = TradingRule(trading_pair, + min_order_size=size_step, + min_base_amount_increment=size_step, + min_price_increment=price_step) + except Exception: + self.logger().error(f"Error parsing the trading pair rule {rule}. Skipping.", exc_info=True) + return result + + async def _api_request(self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + is_auth_required: bool = False, + try_count: int = 0) -> Dict[str, Any]: + """ + Sends an aiohttp request and waits for a response. + :param method: The HTTP method, e.g. get or post + :param endpoint: The path url or the API end point + :param params: Additional get/post parameters + :param is_auth_required: Whether an authentication is required, when True the function will add encrypted + signature to the request. + :returns A response in json format. + """ + url = f"{Constants.REST_URL}/{endpoint}" + shared_client = await self._http_client() + # Turn `params` into either GET params or POST body data + qs_params: dict = params if method.upper() == "GET" else None + req_form = aiohttp.FormData(params) if method.upper() == "POST" and params is not None else None + # Generate auth headers if needed. + headers: dict = {"Content-Type": "application/x-www-form-urlencoded"} + if is_auth_required: + headers: dict = self._hitbtc_auth.get_headers(method, f"{Constants.REST_URL_AUTH}/{endpoint}", + params) + # Build request coro + response_coro = shared_client.request(method=method.upper(), url=url, headers=headers, + params=qs_params, data=req_form, + timeout=Constants.API_CALL_TIMEOUT) + http_status, parsed_response, request_errors = await aiohttp_response_with_errors(response_coro) + if request_errors or parsed_response is None: + if try_count < Constants.API_MAX_RETRIES: + try_count += 1 + time_sleep = retry_sleep_time(try_count) + self.logger().info(f"Error fetching data from {url}. HTTP status is {http_status}. " + f"Retrying in {time_sleep:.0f}s.") + await asyncio.sleep(time_sleep) + return await self._api_request(method=method, endpoint=endpoint, params=params, + is_auth_required=is_auth_required, try_count=try_count) + else: + raise HitbtcAPIError({"error": parsed_response, "status": http_status}) + if "error" in parsed_response: + raise HitbtcAPIError(parsed_response) + return parsed_response + + def get_order_price_quantum(self, trading_pair: str, price: Decimal): + """ + Returns a price step, a minimum price increment for a given trading pair. + """ + trading_rule = self._trading_rules[trading_pair] + return trading_rule.min_price_increment + + def get_order_size_quantum(self, trading_pair: str, order_size: Decimal): + """ + Returns an order amount step, a minimum amount increment for a given trading pair. + """ + trading_rule = self._trading_rules[trading_pair] + return Decimal(trading_rule.min_base_amount_increment) + + def get_order_book(self, trading_pair: str) -> OrderBook: + if trading_pair not in self._order_book_tracker.order_books: + raise ValueError(f"No order book exists for '{trading_pair}'.") + return self._order_book_tracker.order_books[trading_pair] + + def buy(self, trading_pair: str, amount: Decimal, order_type=OrderType.MARKET, + price: Decimal = s_decimal_NaN, **kwargs) -> str: + """ + Buys an amount of base asset (of the given trading pair). This function returns immediately. + To see an actual order, you'll have to wait for BuyOrderCreatedEvent. + :param trading_pair: The market (e.g. BTC-USDT) to buy from + :param amount: The amount in base token value + :param order_type: The order type + :param price: The price (note: this is no longer optional) + :returns A new internal order id + """ + order_id: str = get_new_client_order_id(True, trading_pair) + safe_ensure_future(self._create_order(TradeType.BUY, order_id, trading_pair, amount, order_type, price)) + return order_id + + def sell(self, trading_pair: str, amount: Decimal, order_type=OrderType.MARKET, + price: Decimal = s_decimal_NaN, **kwargs) -> str: + """ + Sells an amount of base asset (of the given trading pair). This function returns immediately. + To see an actual order, you'll have to wait for SellOrderCreatedEvent. + :param trading_pair: The market (e.g. BTC-USDT) to sell from + :param amount: The amount in base token value + :param order_type: The order type + :param price: The price (note: this is no longer optional) + :returns A new internal order id + """ + order_id: str = get_new_client_order_id(False, trading_pair) + safe_ensure_future(self._create_order(TradeType.SELL, order_id, trading_pair, amount, order_type, price)) + return order_id + + def cancel(self, trading_pair: str, order_id: str): + """ + Cancel an order. This function returns immediately. + To get the cancellation result, you'll have to wait for OrderCancelledEvent. + :param trading_pair: The market (e.g. BTC-USDT) of the order. + :param order_id: The internal order id (also called client_order_id) + """ + safe_ensure_future(self._execute_cancel(trading_pair, order_id)) + return order_id + + async def _create_order(self, + trade_type: TradeType, + order_id: str, + trading_pair: str, + amount: Decimal, + order_type: OrderType, + price: Decimal): + """ + Calls create-order API end point to place an order, starts tracking the order and triggers order created event. + :param trade_type: BUY or SELL + :param order_id: Internal order id (also called client_order_id) + :param trading_pair: The market to place order + :param amount: The order amount (in base token value) + :param order_type: The order type + :param price: The order price + """ + if not order_type.is_limit_type(): + raise Exception(f"Unsupported order type: {order_type}") + trading_rule = self._trading_rules[trading_pair] + + amount = self.quantize_order_amount(trading_pair, amount) + price = self.quantize_order_price(trading_pair, price) + if amount < trading_rule.min_order_size: + raise ValueError(f"Buy order amount {amount} is lower than the minimum order size " + f"{trading_rule.min_order_size}.") + order_type_str = order_type.name.lower().split("_")[0] + api_params = {"symbol": convert_to_exchange_trading_pair(trading_pair), + "side": trade_type.name.lower(), + "type": order_type_str, + "price": f"{price:f}", + "quantity": f"{amount:f}", + "clientOrderId": order_id, + # Without strict validate, HitBTC might adjust order prices/sizes. + "strictValidate": "true", + } + if order_type is OrderType.LIMIT_MAKER: + api_params["postOnly"] = "true" + self.start_tracking_order(order_id, None, trading_pair, trade_type, price, amount, order_type) + try: + order_result = await self._api_request("POST", Constants.ENDPOINT["ORDER_CREATE"], api_params, True) + exchange_order_id = str(order_result["id"]) + tracked_order = self._in_flight_orders.get(order_id) + if tracked_order is not None: + self.logger().info(f"Created {order_type.name} {trade_type.name} order {order_id} for " + f"{amount} {trading_pair}.") + tracked_order.update_exchange_order_id(exchange_order_id) + if trade_type is TradeType.BUY: + event_tag = MarketEvent.BuyOrderCreated + event_cls = BuyOrderCreatedEvent + else: + event_tag = MarketEvent.SellOrderCreated + event_cls = SellOrderCreatedEvent + self.trigger_event(event_tag, + event_cls(self.current_timestamp, order_type, trading_pair, amount, price, order_id)) + except asyncio.CancelledError: + raise + except HitbtcAPIError as e: + error_reason = e.error_payload.get('error', {}).get('message') + self.stop_tracking_order(order_id) + self.logger().network( + f"Error submitting {trade_type.name} {order_type.name} order to {Constants.EXCHANGE_NAME} for " + f"{amount} {trading_pair} {price} - {error_reason}.", + exc_info=True, + app_warning_msg=(f"Error submitting order to {Constants.EXCHANGE_NAME} - {error_reason}.") + ) + self.trigger_event(MarketEvent.OrderFailure, + MarketOrderFailureEvent(self.current_timestamp, order_id, order_type)) + + def start_tracking_order(self, + order_id: str, + exchange_order_id: str, + trading_pair: str, + trade_type: TradeType, + price: Decimal, + amount: Decimal, + order_type: OrderType): + """ + Starts tracking an order by simply adding it into _in_flight_orders dictionary. + """ + self._in_flight_orders[order_id] = HitbtcInFlightOrder( + client_order_id=order_id, + exchange_order_id=exchange_order_id, + trading_pair=trading_pair, + order_type=order_type, + trade_type=trade_type, + price=price, + amount=amount + ) + + def stop_tracking_order(self, order_id: str): + """ + Stops tracking an order by simply removing it from _in_flight_orders dictionary. + """ + if order_id in self._in_flight_orders: + del self._in_flight_orders[order_id] + if order_id in self._order_not_found_records: + del self._order_not_found_records[order_id] + + async def _execute_cancel(self, trading_pair: str, order_id: str) -> str: + """ + Executes order cancellation process by first calling cancel-order API. The API result doesn't confirm whether + the cancellation is successful, it simply states it receives the request. + :param trading_pair: The market trading pair (Unused during cancel on HitBTC) + :param order_id: The internal order id + order.last_state to change to CANCELED + """ + order_was_cancelled = False + try: + tracked_order = self._in_flight_orders.get(order_id) + if tracked_order is None: + raise ValueError(f"Failed to cancel order - {order_id}. Order not found.") + if tracked_order.exchange_order_id is None: + await tracked_order.get_exchange_order_id() + # ex_order_id = tracked_order.exchange_order_id + await self._api_request("DELETE", + Constants.ENDPOINT["ORDER_DELETE"].format(id=order_id), + is_auth_required=True) + order_was_cancelled = True + except asyncio.CancelledError: + raise + except HitbtcAPIError as e: + err = e.error_payload.get('error', e.error_payload) + self._order_not_found_records[order_id] = self._order_not_found_records.get(order_id, 0) + 1 + if err.get('code') == 20002 and \ + self._order_not_found_records[order_id] >= self.ORDER_NOT_EXIST_CANCEL_COUNT: + order_was_cancelled = True + if order_was_cancelled: + self.logger().info(f"Successfully cancelled order {order_id} on {Constants.EXCHANGE_NAME}.") + self.stop_tracking_order(order_id) + self.trigger_event(MarketEvent.OrderCancelled, + OrderCancelledEvent(self.current_timestamp, order_id)) + tracked_order.cancelled_event.set() + return CancellationResult(order_id, True) + else: + self.logger().network( + f"Failed to cancel order {order_id}: {err.get('message', str(err))}", + exc_info=True, + app_warning_msg=f"Failed to cancel the order {order_id} on {Constants.EXCHANGE_NAME}. " + f"Check API key and network connection." + ) + return CancellationResult(order_id, False) + + async def _status_polling_loop(self): + """ + Periodically update user balances and order status via REST API. This serves as a fallback measure for web + socket API updates. + """ + while True: + try: + self._poll_notifier = asyncio.Event() + await self._poll_notifier.wait() + await safe_gather( + self._update_balances(), + self._update_order_status(), + ) + self._last_poll_timestamp = self.current_timestamp + except asyncio.CancelledError: + raise + except Exception as e: + self.logger().error(str(e), exc_info=True) + warn_msg = (f"Could not fetch account updates from {Constants.EXCHANGE_NAME}. " + "Check API key and network connection.") + self.logger().network("Unexpected error while fetching account updates.", exc_info=True, + app_warning_msg=warn_msg) + await asyncio.sleep(0.5) + + async def _update_balances(self): + """ + Calls REST API to update total and available balances. + """ + account_info = await self._api_request("GET", Constants.ENDPOINT["USER_BALANCES"], is_auth_required=True) + self._process_balance_message(account_info) + + async def _update_order_status(self): + """ + Calls REST API to get status update for each in-flight order. + """ + last_tick = int(self._last_poll_timestamp / Constants.UPDATE_ORDER_STATUS_INTERVAL) + current_tick = int(self.current_timestamp / Constants.UPDATE_ORDER_STATUS_INTERVAL) + + if current_tick > last_tick and len(self._in_flight_orders) > 0: + tracked_orders = list(self._in_flight_orders.values()) + tasks = [] + for tracked_order in tracked_orders: + # exchange_order_id = await tracked_order.get_exchange_order_id() + order_id = tracked_order.client_order_id + tasks.append(self._api_request("GET", + Constants.ENDPOINT["ORDER_STATUS"].format(id=order_id), + is_auth_required=True)) + self.logger().debug(f"Polling for order status updates of {len(tasks)} orders.") + responses = await safe_gather(*tasks, return_exceptions=True) + for response, tracked_order in zip(responses, tracked_orders): + client_order_id = tracked_order.client_order_id + if isinstance(response, HitbtcAPIError): + err = response.error_payload.get('error', response.error_payload) + if err.get('code') == 20002: + self._order_not_found_records[client_order_id] = \ + self._order_not_found_records.get(client_order_id, 0) + 1 + if self._order_not_found_records[client_order_id] < self.ORDER_NOT_EXIST_CONFIRMATION_COUNT: + # Wait until the order not found error have repeated a few times before actually treating + # it as failed. See: https://github.com/CoinAlpha/hummingbot/issues/601 + continue + self.trigger_event(MarketEvent.OrderFailure, + MarketOrderFailureEvent( + self.current_timestamp, client_order_id, tracked_order.order_type)) + self.stop_tracking_order(client_order_id) + else: + continue + elif "clientOrderId" not in response: + self.logger().info(f"_update_order_status clientOrderId not in resp: {response}") + continue + else: + self._process_order_message(response) + + def _process_order_message(self, order_msg: Dict[str, Any]): + """ + Updates in-flight order and triggers cancellation or failure event if needed. + :param order_msg: The order response from either REST or web socket API (they are of the same format) + Example Order: + { + "id": "4345613661", + "clientOrderId": "57d5525562c945448e3cbd559bd068c3", + "symbol": "BCCBTC", + "side": "sell", + "status": "new", + "type": "limit", + "timeInForce": "GTC", + "quantity": "0.013", + "price": "0.100000", + "cumQuantity": "0.000", + "postOnly": false, + "createdAt": "2017-10-20T12:17:12.245Z", + "updatedAt": "2017-10-20T12:17:12.245Z", + "reportType": "status" + } + """ + client_order_id = order_msg["clientOrderId"] + if client_order_id not in self._in_flight_orders: + return + tracked_order = self._in_flight_orders[client_order_id] + # Update order execution status + tracked_order.last_state = order_msg["status"] + # update order + tracked_order.executed_amount_base = Decimal(order_msg["cumQuantity"]) + tracked_order.executed_amount_quote = Decimal(order_msg["price"]) * Decimal(order_msg["cumQuantity"]) + + if tracked_order.is_cancelled: + self.logger().info(f"Successfully cancelled order {client_order_id}.") + self.stop_tracking_order(client_order_id) + self.trigger_event(MarketEvent.OrderCancelled, + OrderCancelledEvent(self.current_timestamp, client_order_id)) + tracked_order.cancelled_event.set() + elif tracked_order.is_failure: + self.logger().info(f"The market order {client_order_id} has failed according to order status API. ") + self.trigger_event(MarketEvent.OrderFailure, + MarketOrderFailureEvent( + self.current_timestamp, client_order_id, tracked_order.order_type)) + self.stop_tracking_order(client_order_id) + + async def _process_trade_message(self, trade_msg: Dict[str, Any]): + """ + Updates in-flight order and trigger order filled event for trade message received. Triggers order completed + event if the total executed amount equals to the specified order amount. + Example Trade: + { + "id": "4345697765", + "clientOrderId": "53b7cf917963464a811a4af426102c19", + "symbol": "ETHBTC", + "side": "sell", + "status": "filled", + "type": "limit", + "timeInForce": "GTC", + "quantity": "0.001", + "price": "0.053868", + "cumQuantity": "0.001", + "postOnly": false, + "createdAt": "2017-10-20T12:20:05.952Z", + "updatedAt": "2017-10-20T12:20:38.708Z", + "reportType": "trade", + "tradeQuantity": "0.001", + "tradePrice": "0.053868", + "tradeId": 55051694, + "tradeFee": "-0.000000005" + } + """ + tracked_orders = list(self._in_flight_orders.values()) + for order in tracked_orders: + await order.get_exchange_order_id() + track_order = [o for o in tracked_orders if trade_msg["id"] == o.exchange_order_id] + if not track_order: + return + tracked_order = track_order[0] + updated = tracked_order.update_with_trade_update(trade_msg) + if not updated: + return + self.trigger_event( + MarketEvent.OrderFilled, + OrderFilledEvent( + self.current_timestamp, + tracked_order.client_order_id, + tracked_order.trading_pair, + tracked_order.trade_type, + tracked_order.order_type, + Decimal(str(trade_msg.get("tradePrice", "0"))), + Decimal(str(trade_msg.get("tradeQuantity", "0"))), + TradeFee(0.0, [(tracked_order.quote_asset, Decimal(str(trade_msg.get("tradeFee", "0"))))]), + exchange_trade_id=trade_msg["id"] + ) + ) + if math.isclose(tracked_order.executed_amount_base, tracked_order.amount) or \ + tracked_order.executed_amount_base >= tracked_order.amount or \ + tracked_order.is_done: + tracked_order.last_state = "FILLED" + self.logger().info(f"The {tracked_order.trade_type.name} order " + f"{tracked_order.client_order_id} has completed " + f"according to order status API.") + event_tag = MarketEvent.BuyOrderCompleted if tracked_order.trade_type is TradeType.BUY \ + else MarketEvent.SellOrderCompleted + event_class = BuyOrderCompletedEvent if tracked_order.trade_type is TradeType.BUY \ + else SellOrderCompletedEvent + await asyncio.sleep(0.1) + self.trigger_event(event_tag, + event_class(self.current_timestamp, + tracked_order.client_order_id, + tracked_order.base_asset, + tracked_order.quote_asset, + tracked_order.fee_asset, + tracked_order.executed_amount_base, + tracked_order.executed_amount_quote, + tracked_order.fee_paid, + tracked_order.order_type)) + self.stop_tracking_order(tracked_order.client_order_id) + + def _process_balance_message(self, balance_update): + local_asset_names = set(self._account_balances.keys()) + remote_asset_names = set() + for account in balance_update: + asset_name = account["currency"] + self._account_available_balances[asset_name] = Decimal(str(account["available"])) + self._account_balances[asset_name] = Decimal(str(account["reserved"])) + Decimal(str(account["available"])) + remote_asset_names.add(asset_name) + + asset_names_to_remove = local_asset_names.difference(remote_asset_names) + for asset_name in asset_names_to_remove: + del self._account_available_balances[asset_name] + del self._account_balances[asset_name] + + async def cancel_all(self, timeout_seconds: float) -> List[CancellationResult]: + """ + Cancels all in-flight orders and waits for cancellation results. + Used by bot's top level stop and exit commands (cancelling outstanding orders on exit) + :param timeout_seconds: The timeout at which the operation will be canceled. + :returns List of CancellationResult which indicates whether each order is successfully cancelled. + """ + if self._trading_pairs is None: + raise Exception("cancel_all can only be used when trading_pairs are specified.") + open_orders = [o for o in self._in_flight_orders.values() if not o.is_done] + if len(open_orders) == 0: + return [] + tasks = [self._execute_cancel(o.trading_pair, o.client_order_id) for o in open_orders] + cancellation_results = [] + try: + async with timeout(timeout_seconds): + cancellation_results = await safe_gather(*tasks, return_exceptions=False) + except Exception: + self.logger().network( + "Unexpected error cancelling orders.", exc_info=True, + app_warning_msg=(f"Failed to cancel all orders on {Constants.EXCHANGE_NAME}. " + "Check API key and network connection.") + ) + return cancellation_results + + def tick(self, timestamp: float): + """ + Is called automatically by the clock for each clock's tick (1 second by default). + It checks if status polling task is due for execution. + """ + now = time.time() + poll_interval = (Constants.SHORT_POLL_INTERVAL + if now - self._user_stream_tracker.last_recv_time > 60.0 + else Constants.LONG_POLL_INTERVAL) + last_tick = int(self._last_timestamp / poll_interval) + current_tick = int(timestamp / poll_interval) + if current_tick > last_tick: + if not self._poll_notifier.is_set(): + self._poll_notifier.set() + self._last_timestamp = timestamp + + def get_fee(self, + base_currency: str, + quote_currency: str, + order_type: OrderType, + order_side: TradeType, + amount: Decimal, + price: Decimal = s_decimal_NaN) -> TradeFee: + """ + To get trading fee, this function is simplified by using fee override configuration. Most parameters to this + function are ignore except order_type. Use OrderType.LIMIT_MAKER to specify you want trading fee for + maker order. + """ + is_maker = order_type is OrderType.LIMIT_MAKER + return TradeFee(percent=self.estimate_fee_pct(is_maker)) + + async def _iter_user_event_queue(self) -> AsyncIterable[Dict[str, any]]: + while True: + try: + yield await self._user_stream_tracker.user_stream.get() + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unknown error. Retrying after 1 seconds.", exc_info=True, + app_warning_msg=(f"Could not fetch user events from {Constants.EXCHANGE_NAME}. " + "Check API key and network connection.")) + await asyncio.sleep(1.0) + + async def _user_stream_event_listener(self): + """ + Listens to message in _user_stream_tracker.user_stream queue. The messages are put in by + HitbtcAPIUserStreamDataSource. + """ + async for event_message in self._iter_user_event_queue(): + try: + event_methods = [ + Constants.WS_METHODS["USER_ORDERS"], + Constants.WS_METHODS["USER_TRADES"], + ] + method: str = event_message.get("method", None) + params: str = event_message.get("params", None) + account_balances: list = event_message.get("result", None) + + if method not in event_methods and account_balances is None: + self.logger().error(f"Unexpected message in user stream: {event_message}.", exc_info=True) + continue + if method == Constants.WS_METHODS["USER_TRADES"]: + await self._process_trade_message(params) + elif method == Constants.WS_METHODS["USER_ORDERS"]: + for order_msg in params: + self._process_order_message(order_msg) + elif isinstance(account_balances, list) and "currency" in account_balances[0]: + self._process_balance_message(account_balances) + except asyncio.CancelledError: + raise + except Exception: + self.logger().error("Unexpected error in user stream listener loop.", exc_info=True) + await asyncio.sleep(5.0) + + # This is currently unused, but looks like a future addition. + async def get_open_orders(self) -> List[OpenOrder]: + result = await self._api_request("GET", Constants.ENDPOINT["USER_ORDERS"], is_auth_required=True) + ret_val = [] + for order in result: + if Constants.HBOT_BROKER_ID not in order["clientOrderId"]: + continue + if order["type"] != OrderType.LIMIT.name.lower(): + self.logger().info(f"Unsupported order type found: {order['type']}") + continue + ret_val.append( + OpenOrder( + client_order_id=order["clientOrderId"], + trading_pair=convert_from_exchange_trading_pair(order["symbol"]), + price=Decimal(str(order["price"])), + amount=Decimal(str(order["quantity"])), + executed_amount=Decimal(str(order["cumQuantity"])), + status=order["status"], + order_type=OrderType.LIMIT, + is_buy=True if order["side"].lower() == TradeType.BUY.name.lower() else False, + time=str_date_to_ts(order["createdAt"]), + exchange_order_id=order["id"] + ) + ) + return ret_val diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_in_flight_order.py b/hummingbot/connector/exchange/hitbtc/hitbtc_in_flight_order.py new file mode 100644 index 0000000000..54766be2f1 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_in_flight_order.py @@ -0,0 +1,118 @@ +from decimal import Decimal +from typing import ( + Any, + Dict, + Optional, +) +import asyncio +from hummingbot.core.event.events import ( + OrderType, + TradeType +) +from hummingbot.connector.in_flight_order_base import InFlightOrderBase + +s_decimal_0 = Decimal(0) + + +class HitbtcInFlightOrder(InFlightOrderBase): + def __init__(self, + client_order_id: str, + exchange_order_id: Optional[str], + trading_pair: str, + order_type: OrderType, + trade_type: TradeType, + price: Decimal, + amount: Decimal, + initial_state: str = "new"): + super().__init__( + client_order_id, + exchange_order_id, + trading_pair, + order_type, + trade_type, + price, + amount, + initial_state, + ) + self.trade_id_set = set() + self.cancelled_event = asyncio.Event() + + @property + def is_done(self) -> bool: + return self.last_state in {"filled", "canceled", "expired"} + + @property + def is_failure(self) -> bool: + return self.last_state in {"suspended"} + + @property + def is_cancelled(self) -> bool: + return self.last_state in {"canceled", "expired"} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> InFlightOrderBase: + """ + :param data: json data from API + :return: formatted InFlightOrder + """ + retval = HitbtcInFlightOrder( + data["client_order_id"], + data["exchange_order_id"], + data["trading_pair"], + getattr(OrderType, data["order_type"]), + getattr(TradeType, data["trade_type"]), + Decimal(data["price"]), + Decimal(data["amount"]), + data["last_state"] + ) + retval.executed_amount_base = Decimal(data["executed_amount_base"]) + retval.executed_amount_quote = Decimal(data["executed_amount_quote"]) + retval.fee_asset = data["fee_asset"] + retval.fee_paid = Decimal(data["fee_paid"]) + retval.last_state = data["last_state"] + return retval + + def update_with_trade_update(self, trade_update: Dict[str, Any]) -> bool: + """ + Updates the in flight order with trade update (from private/get-order-detail end point) + return: True if the order gets updated otherwise False + Example Trade: + { + "id": "4345697765", + "clientOrderId": "53b7cf917963464a811a4af426102c19", + "symbol": "ETHBTC", + "side": "sell", + "status": "filled", + "type": "limit", + "timeInForce": "GTC", + "quantity": "0.001", + "price": "0.053868", + "cumQuantity": "0.001", + "postOnly": false, + "createdAt": "2017-10-20T12:20:05.952Z", + "updatedAt": "2017-10-20T12:20:38.708Z", + "reportType": "trade", + } + ... Trade variables are only included after fills. + { + "tradeQuantity": "0.001", + "tradePrice": "0.053868", + "tradeId": 55051694, + "tradeFee": "-0.000000005" + } + """ + self.executed_amount_base = Decimal(str(trade_update["cumQuantity"])) + if self.executed_amount_base <= s_decimal_0: + # No trades executed yet. + return False + trade_id = trade_update["updatedAt"] + if trade_id in self.trade_id_set: + # trade already recorded + return False + self.trade_id_set.add(trade_id) + self.fee_paid += Decimal(str(trade_update.get("tradeFee", "0"))) + self.executed_amount_quote += (Decimal(str(trade_update.get("tradePrice", "0"))) * + Decimal(str(trade_update.get("tradeQuantity", "0")))) + if not self.fee_asset: + self.fee_asset = self.quote_asset + return True diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_order_book.py b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book.py new file mode 100644 index 0000000000..1a3c91a121 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python + +import logging +from hummingbot.connector.exchange.hitbtc.hitbtc_constants import Constants + +from sqlalchemy.engine import RowProxy +from typing import ( + Optional, + Dict, + List, Any) +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import ( + OrderBookMessage, OrderBookMessageType +) +from hummingbot.connector.exchange.hitbtc.hitbtc_order_book_message import HitbtcOrderBookMessage + +_logger = None + + +class HitbtcOrderBook(OrderBook): + @classmethod + def logger(cls) -> HummingbotLogger: + global _logger + if _logger is None: + _logger = logging.getLogger(__name__) + return _logger + + @classmethod + def snapshot_message_from_exchange(cls, + msg: Dict[str, any], + timestamp: float, + metadata: Optional[Dict] = None): + """ + Convert json snapshot data into standard OrderBookMessage format + :param msg: json snapshot data from live web socket stream + :param timestamp: timestamp attached to incoming data + :return: HitbtcOrderBookMessage + """ + + if metadata: + msg.update(metadata) + + return HitbtcOrderBookMessage( + message_type=OrderBookMessageType.SNAPSHOT, + content=msg, + timestamp=timestamp + ) + + @classmethod + def snapshot_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None): + """ + *used for backtesting + Convert a row of snapshot data into standard OrderBookMessage format + :param record: a row of snapshot data from the database + :return: HitbtcOrderBookMessage + """ + return HitbtcOrderBookMessage( + message_type=OrderBookMessageType.SNAPSHOT, + content=record.json, + timestamp=record.timestamp + ) + + @classmethod + def diff_message_from_exchange(cls, + msg: Dict[str, any], + timestamp: Optional[float] = None, + metadata: Optional[Dict] = None): + """ + Convert json diff data into standard OrderBookMessage format + :param msg: json diff data from live web socket stream + :param timestamp: timestamp attached to incoming data + :return: HitbtcOrderBookMessage + """ + + if metadata: + msg.update(metadata) + + return HitbtcOrderBookMessage( + message_type=OrderBookMessageType.DIFF, + content=msg, + timestamp=timestamp + ) + + @classmethod + def diff_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None): + """ + *used for backtesting + Convert a row of diff data into standard OrderBookMessage format + :param record: a row of diff data from the database + :return: HitbtcOrderBookMessage + """ + return HitbtcOrderBookMessage( + message_type=OrderBookMessageType.DIFF, + content=record.json, + timestamp=record.timestamp + ) + + @classmethod + def trade_message_from_exchange(cls, + msg: Dict[str, Any], + timestamp: Optional[float] = None, + metadata: Optional[Dict] = None): + """ + Convert a trade data into standard OrderBookMessage format + :param record: a trade data from the database + :return: HitbtcOrderBookMessage + """ + + if metadata: + msg.update(metadata) + + msg.update({ + "exchange_order_id": msg.get("id"), + "trade_type": msg.get("side"), + "price": msg.get("price"), + "amount": msg.get("quantity"), + }) + + return HitbtcOrderBookMessage( + message_type=OrderBookMessageType.TRADE, + content=msg, + timestamp=timestamp + ) + + @classmethod + def trade_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None): + """ + *used for backtesting + Convert a row of trade data into standard OrderBookMessage format + :param record: a row of trade data from the database + :return: HitbtcOrderBookMessage + """ + return HitbtcOrderBookMessage( + message_type=OrderBookMessageType.TRADE, + content=record.json, + timestamp=record.timestamp + ) + + @classmethod + def from_snapshot(cls, snapshot: OrderBookMessage): + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book needs to retain individual order data.") + + @classmethod + def restore_from_snapshot_and_diffs(self, snapshot: OrderBookMessage, diffs: List[OrderBookMessage]): + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book needs to retain individual order data.") diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_message.py b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_message.py new file mode 100644 index 0000000000..fdc207d64d --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_message.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +from typing import ( + Dict, + List, + Optional, +) + +from hummingbot.core.data_type.order_book_row import OrderBookRow +from hummingbot.core.data_type.order_book_message import ( + OrderBookMessage, + OrderBookMessageType, +) +from .hitbtc_constants import Constants +from .hitbtc_utils import ( + convert_from_exchange_trading_pair, +) + + +class HitbtcOrderBookMessage(OrderBookMessage): + def __new__( + cls, + message_type: OrderBookMessageType, + content: Dict[str, any], + timestamp: Optional[float] = None, + *args, + **kwargs, + ): + if timestamp is None: + if message_type is OrderBookMessageType.SNAPSHOT: + raise ValueError("timestamp must not be None when initializing snapshot messages.") + timestamp = content["timestamp"] + + return super(HitbtcOrderBookMessage, cls).__new__( + cls, message_type, content, timestamp=timestamp, *args, **kwargs + ) + + @property + def update_id(self) -> int: + if self.type in [OrderBookMessageType.DIFF, OrderBookMessageType.SNAPSHOT]: + return int(self.timestamp * 1e3) + else: + return -1 + + @property + def trade_id(self) -> int: + if self.type is OrderBookMessageType.TRADE: + return int(self.timestamp * 1e3) + return -1 + + @property + def trading_pair(self) -> str: + if "trading_pair" in self.content: + return self.content["trading_pair"] + elif "symbol" in self.content: + return convert_from_exchange_trading_pair(self.content["symbol"]) + + # The `asks` and `bids` properties are only used in the methods below. + # They are all replaced or unused in this connector: + # OrderBook.restore_from_snapshot_and_diffs + # OrderBookTracker._track_single_book + # MockAPIOrderBookDataSource.get_tracking_pairs + @property + def asks(self) -> List[OrderBookRow]: + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book uses active_order_tracker.") + + @property + def bids(self) -> List[OrderBookRow]: + raise NotImplementedError(Constants.EXCHANGE_NAME + " order book uses active_order_tracker.") + + def __eq__(self, other) -> bool: + return self.type == other.type and self.timestamp == other.timestamp + + def __lt__(self, other) -> bool: + if self.timestamp != other.timestamp: + return self.timestamp < other.timestamp + else: + """ + If timestamp is the same, the ordering is snapshot < diff < trade + """ + return self.type.value < other.type.value diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_tracker.py b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_tracker.py new file mode 100644 index 0000000000..d3161de17e --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_tracker.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +import asyncio +import bisect +import logging +from hummingbot.connector.exchange.hitbtc.hitbtc_constants import Constants +import time + +from collections import defaultdict, deque +from typing import Optional, Dict, List, Deque +from hummingbot.core.data_type.order_book_message import OrderBookMessageType +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.order_book_tracker import OrderBookTracker +from hummingbot.connector.exchange.hitbtc.hitbtc_order_book_message import HitbtcOrderBookMessage +from hummingbot.connector.exchange.hitbtc.hitbtc_active_order_tracker import HitbtcActiveOrderTracker +from hummingbot.connector.exchange.hitbtc.hitbtc_api_order_book_data_source import HitbtcAPIOrderBookDataSource +from hummingbot.connector.exchange.hitbtc.hitbtc_order_book import HitbtcOrderBook + + +class HitbtcOrderBookTracker(OrderBookTracker): + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, trading_pairs: Optional[List[str]] = None,): + super().__init__(HitbtcAPIOrderBookDataSource(trading_pairs), trading_pairs) + + self._ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + self._order_book_snapshot_stream: asyncio.Queue = asyncio.Queue() + self._order_book_diff_stream: asyncio.Queue = asyncio.Queue() + self._order_book_trade_stream: asyncio.Queue = asyncio.Queue() + self._process_msg_deque_task: Optional[asyncio.Task] = None + self._past_diffs_windows: Dict[str, Deque] = {} + self._order_books: Dict[str, HitbtcOrderBook] = {} + self._saved_message_queues: Dict[str, Deque[HitbtcOrderBookMessage]] = \ + defaultdict(lambda: deque(maxlen=1000)) + self._active_order_trackers: Dict[str, HitbtcActiveOrderTracker] = defaultdict(HitbtcActiveOrderTracker) + self._order_book_stream_listener_task: Optional[asyncio.Task] = None + self._order_book_trade_listener_task: Optional[asyncio.Task] = None + + @property + def exchange_name(self) -> str: + """ + Name of the current exchange + """ + return Constants.EXCHANGE_NAME + + async def _track_single_book(self, trading_pair: str): + """ + Update an order book with changes from the latest batch of received messages + """ + past_diffs_window: Deque[HitbtcOrderBookMessage] = deque() + self._past_diffs_windows[trading_pair] = past_diffs_window + + message_queue: asyncio.Queue = self._tracking_message_queues[trading_pair] + order_book: HitbtcOrderBook = self._order_books[trading_pair] + active_order_tracker: HitbtcActiveOrderTracker = self._active_order_trackers[trading_pair] + + last_message_timestamp: float = time.time() + diff_messages_accepted: int = 0 + + while True: + try: + message: HitbtcOrderBookMessage = None + saved_messages: Deque[HitbtcOrderBookMessage] = self._saved_message_queues[trading_pair] + # Process saved messages first if there are any + if len(saved_messages) > 0: + message = saved_messages.popleft() + else: + message = await message_queue.get() + + if message.type is OrderBookMessageType.DIFF: + bids, asks = active_order_tracker.convert_diff_message_to_order_book_row(message) + order_book.apply_diffs(bids, asks, message.update_id) + past_diffs_window.append(message) + while len(past_diffs_window) > self.PAST_DIFF_WINDOW_SIZE: + past_diffs_window.popleft() + diff_messages_accepted += 1 + + # Output some statistics periodically. + now: float = time.time() + if int(now / 60.0) > int(last_message_timestamp / 60.0): + self.logger().debug(f"Processed {diff_messages_accepted} order book diffs for {trading_pair}.") + diff_messages_accepted = 0 + last_message_timestamp = now + elif message.type is OrderBookMessageType.SNAPSHOT: + past_diffs: List[HitbtcOrderBookMessage] = list(past_diffs_window) + # only replay diffs later than snapshot, first update active order with snapshot then replay diffs + replay_position = bisect.bisect_right(past_diffs, message) + replay_diffs = past_diffs[replay_position:] + s_bids, s_asks = active_order_tracker.convert_snapshot_message_to_order_book_row(message) + order_book.apply_snapshot(s_bids, s_asks, message.update_id) + for diff_message in replay_diffs: + d_bids, d_asks = active_order_tracker.convert_diff_message_to_order_book_row(diff_message) + order_book.apply_diffs(d_bids, d_asks, diff_message.update_id) + + self.logger().debug(f"Processed order book snapshot for {trading_pair}.") + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + f"Unexpected error processing order book messages for {trading_pair}.", + exc_info=True, + app_warning_msg="Unexpected error processing order book messages. Retrying after 5 seconds." + ) + await asyncio.sleep(5.0) diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_tracker_entry.py b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_tracker_entry.py new file mode 100644 index 0000000000..5edfbadec0 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_order_book_tracker_entry.py @@ -0,0 +1,21 @@ +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_tracker_entry import OrderBookTrackerEntry +from hummingbot.connector.exchange.hitbtc.hitbtc_active_order_tracker import HitbtcActiveOrderTracker + + +class HitbtcOrderBookTrackerEntry(OrderBookTrackerEntry): + def __init__( + self, trading_pair: str, timestamp: float, order_book: OrderBook, active_order_tracker: HitbtcActiveOrderTracker + ): + self._active_order_tracker = active_order_tracker + super(HitbtcOrderBookTrackerEntry, self).__init__(trading_pair, timestamp, order_book) + + def __repr__(self) -> str: + return ( + f"HitbtcOrderBookTrackerEntry(trading_pair='{self._trading_pair}', timestamp='{self._timestamp}', " + f"order_book='{self._order_book}')" + ) + + @property + def active_order_tracker(self) -> HitbtcActiveOrderTracker: + return self._active_order_tracker diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_user_stream_tracker.py b/hummingbot/connector/exchange/hitbtc/hitbtc_user_stream_tracker.py new file mode 100644 index 0000000000..7b04002ccd --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_user_stream_tracker.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import asyncio +import logging +from typing import ( + Optional, + List, +) +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.logger import HummingbotLogger +from hummingbot.core.data_type.user_stream_tracker import ( + UserStreamTracker +) +from hummingbot.core.utils.async_utils import ( + safe_ensure_future, + safe_gather, +) +from hummingbot.connector.exchange.hitbtc.hitbtc_api_user_stream_data_source import \ + HitbtcAPIUserStreamDataSource +from hummingbot.connector.exchange.hitbtc.hitbtc_auth import HitbtcAuth +from hummingbot.connector.exchange.hitbtc.hitbtc_constants import Constants + + +class HitbtcUserStreamTracker(UserStreamTracker): + _cbpust_logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._bust_logger is None: + cls._bust_logger = logging.getLogger(__name__) + return cls._bust_logger + + def __init__(self, + hitbtc_auth: Optional[HitbtcAuth] = None, + trading_pairs: Optional[List[str]] = []): + super().__init__() + self._hitbtc_auth: HitbtcAuth = hitbtc_auth + self._trading_pairs: List[str] = trading_pairs + self._ev_loop: asyncio.events.AbstractEventLoop = asyncio.get_event_loop() + self._data_source: Optional[UserStreamTrackerDataSource] = None + self._user_stream_tracking_task: Optional[asyncio.Task] = None + + @property + def data_source(self) -> UserStreamTrackerDataSource: + """ + *required + Initializes a user stream data source (user specific order diffs from live socket stream) + :return: OrderBookTrackerDataSource + """ + if not self._data_source: + self._data_source = HitbtcAPIUserStreamDataSource( + hitbtc_auth=self._hitbtc_auth, + trading_pairs=self._trading_pairs + ) + return self._data_source + + @property + def exchange_name(self) -> str: + """ + *required + Name of the current exchange + """ + return Constants.EXCHANGE_NAME + + async def start(self): + """ + *required + Start all listeners and tasks + """ + self._user_stream_tracking_task = safe_ensure_future( + self.data_source.listen_for_user_stream(self._ev_loop, self._user_stream) + ) + await safe_gather(self._user_stream_tracking_task) diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_utils.py b/hummingbot/connector/exchange/hitbtc/hitbtc_utils.py new file mode 100644 index 0000000000..c549ce8b72 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_utils.py @@ -0,0 +1,156 @@ +import aiohttp +import asyncio +import random +import re +from dateutil.parser import parse as dateparse +from typing import ( + Any, + Dict, + Optional, + Tuple, +) + +from hummingbot.core.utils.tracking_nonce import get_tracking_nonce +from hummingbot.client.config.config_var import ConfigVar +from hummingbot.client.config.config_methods import using_exchange +from .hitbtc_constants import Constants + + +TRADING_PAIR_SPLITTER = re.compile(Constants.TRADING_PAIR_SPLITTER) + +CENTRALIZED = True + +EXAMPLE_PAIR = "BTC-USD" + +DEFAULT_FEES = [0.1, 0.25] + + +class HitbtcAPIError(IOError): + def __init__(self, error_payload: Dict[str, Any]): + super().__init__(str(error_payload)) + self.error_payload = error_payload + + +# convert date string to timestamp +def str_date_to_ts(date: str) -> int: + return int(dateparse(date).timestamp()) + + +# Request ID class +class RequestId: + """ + Generate request ids + """ + _request_id: int = 0 + + @classmethod + def generate_request_id(cls) -> int: + return get_tracking_nonce() + + +def split_trading_pair(trading_pair: str) -> Optional[Tuple[str, str]]: + try: + m = TRADING_PAIR_SPLITTER.match(trading_pair) + return m.group(1), m.group(2) + # Exceptions are now logged as warnings in trading pair fetcher + except Exception: + return None + + +def convert_from_exchange_trading_pair(ex_trading_pair: str) -> Optional[str]: + regex_match = split_trading_pair(ex_trading_pair) + if regex_match is None: + return None + # HitBTC uses uppercase (BTCUSDT) + base_asset, quote_asset = split_trading_pair(ex_trading_pair) + return f"{base_asset.upper()}-{quote_asset.upper()}" + + +def convert_to_exchange_trading_pair(hb_trading_pair: str) -> str: + # HitBTC uses uppercase (BTCUSDT) + return hb_trading_pair.replace("-", "").upper() + + +def get_new_client_order_id(is_buy: bool, trading_pair: str) -> str: + side = "B" if is_buy else "S" + symbols = trading_pair.split("-") + base = symbols[0].upper() + quote = symbols[1].upper() + base_str = f"{base[0]}{base[-1]}" + quote_str = f"{quote[0]}{quote[-1]}" + return f"{Constants.HBOT_BROKER_ID}-{side}-{base_str}{quote_str}-{get_tracking_nonce()}" + + +def retry_sleep_time(try_count: int) -> float: + random.seed() + randSleep = 1 + float(random.randint(1, 10) / 100) + return float(2 + float(randSleep * (1 + (try_count ** try_count)))) + + +async def aiohttp_response_with_errors(request_coroutine): + http_status, parsed_response, request_errors = None, None, False + try: + async with request_coroutine as response: + http_status = response.status + try: + parsed_response = await response.json() + except Exception: + request_errors = True + try: + parsed_response = str(await response.read()) + if len(parsed_response) > 100: + parsed_response = f"{parsed_response[:100]} ... (truncated)" + except Exception: + pass + TempFailure = (parsed_response is None or + (response.status not in [200, 201] and "error" not in parsed_response)) + if TempFailure: + parsed_response = response.reason if parsed_response is None else parsed_response + request_errors = True + except Exception: + request_errors = True + return http_status, parsed_response, request_errors + + +async def api_call_with_retries(method, + endpoint, + params: Optional[Dict[str, Any]] = None, + shared_client=None, + try_count: int = 0) -> Dict[str, Any]: + url = f"{Constants.REST_URL}/{endpoint}" + headers = {"Content-Type": "application/json"} + http_client = shared_client if shared_client is not None else aiohttp.ClientSession() + # Build request coro + response_coro = http_client.request(method=method.upper(), url=url, headers=headers, + params=params, timeout=Constants.API_CALL_TIMEOUT) + http_status, parsed_response, request_errors = await aiohttp_response_with_errors(response_coro) + if shared_client is None: + await http_client.close() + if request_errors or parsed_response is None: + if try_count < Constants.API_MAX_RETRIES: + try_count += 1 + time_sleep = retry_sleep_time(try_count) + print(f"Error fetching data from {url}. HTTP status is {http_status}. " + f"Retrying in {time_sleep:.0f}s.") + await asyncio.sleep(time_sleep) + return await api_call_with_retries(method=method, endpoint=endpoint, params=params, + shared_client=shared_client, try_count=try_count) + else: + raise HitbtcAPIError({"error": parsed_response, "status": http_status}) + return parsed_response + + +KEYS = { + "hitbtc_api_key": + ConfigVar(key="hitbtc_api_key", + prompt=f"Enter your {Constants.EXCHANGE_NAME} API key >>> ", + required_if=using_exchange("hitbtc"), + is_secure=True, + is_connect_key=True), + "hitbtc_secret_key": + ConfigVar(key="hitbtc_secret_key", + prompt=f"Enter your {Constants.EXCHANGE_NAME} secret key >>> ", + required_if=using_exchange("hitbtc"), + is_secure=True, + is_connect_key=True), +} diff --git a/hummingbot/connector/exchange/hitbtc/hitbtc_websocket.py b/hummingbot/connector/exchange/hitbtc/hitbtc_websocket.py new file mode 100644 index 0000000000..da65b869a2 --- /dev/null +++ b/hummingbot/connector/exchange/hitbtc/hitbtc_websocket.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +import asyncio +import copy +import logging +import websockets +import json +from hummingbot.connector.exchange.hitbtc.hitbtc_constants import Constants + + +from typing import ( + Any, + AsyncIterable, + Dict, + Optional, +) +from websockets.exceptions import ConnectionClosed +from hummingbot.logger import HummingbotLogger +from hummingbot.connector.exchange.hitbtc.hitbtc_auth import HitbtcAuth +from hummingbot.connector.exchange.hitbtc.hitbtc_utils import ( + RequestId, + HitbtcAPIError, +) + +# reusable websocket class +# ToDo: We should eventually remove this class, and instantiate web socket connection normally (see Binance for example) + + +class HitbtcWebsocket(RequestId): + _logger: Optional[HummingbotLogger] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def __init__(self, + auth: Optional[HitbtcAuth] = None): + self._auth: Optional[HitbtcAuth] = auth + self._isPrivate = True if self._auth is not None else False + self._WS_URL = Constants.WS_PRIVATE_URL if self._isPrivate else Constants.WS_PUBLIC_URL + self._client: Optional[websockets.WebSocketClientProtocol] = None + + # connect to exchange + async def connect(self): + self._client = await websockets.connect(self._WS_URL) + + # if auth class was passed into websocket class + # we need to emit authenticated requests + if self._isPrivate: + auth_params = self._auth.generate_auth_dict_ws(self.generate_request_id()) + await self._emit("login", auth_params, no_id=True) + raw_msg_str: str = await asyncio.wait_for(self._client.recv(), timeout=Constants.MESSAGE_TIMEOUT) + json_msg = json.loads(raw_msg_str) + if json_msg.get("result") is not True: + err_msg = json_msg.get('error', {}).get('message') + raise HitbtcAPIError({"error": f"Failed to authenticate to websocket - {err_msg}."}) + + return self._client + + # disconnect from exchange + async def disconnect(self): + if self._client is None: + return + + await self._client.close() + + # receive & parse messages + async def _messages(self) -> AsyncIterable[Any]: + try: + while True: + try: + raw_msg_str: str = await asyncio.wait_for(self._client.recv(), timeout=Constants.MESSAGE_TIMEOUT) + try: + msg = json.loads(raw_msg_str) + # HitBTC doesn't support ping or heartbeat messages. + # Can handle them here if that changes - use `safe_ensure_future`. + yield msg + except ValueError: + continue + except asyncio.TimeoutError: + await asyncio.wait_for(self._client.ping(), timeout=Constants.PING_TIMEOUT) + except asyncio.TimeoutError: + self.logger().warning("WebSocket ping timed out. Going to reconnect...") + return + except ConnectionClosed: + return + finally: + await self.disconnect() + + # emit messages + async def _emit(self, method: str, data: Optional[Dict[str, Any]] = {}, no_id: bool = False) -> int: + id = self.generate_request_id() + + payload = { + "id": id, + "method": method, + "params": copy.deepcopy(data), + } + + await self._client.send(json.dumps(payload)) + + return id + + # request via websocket + async def request(self, method: str, data: Optional[Dict[str, Any]] = {}) -> int: + return await self._emit(method, data) + + # subscribe to a method + async def subscribe(self, + channel: str, + trading_pair: Optional[str] = None, + params: Optional[Dict[str, Any]] = {}) -> int: + if trading_pair is not None: + params['symbol'] = trading_pair + return await self.request(f"subscribe{channel}", params) + + # unsubscribe to a method + async def unsubscribe(self, + channel: str, + trading_pair: Optional[str] = None, + params: Optional[Dict[str, Any]] = {}) -> int: + if trading_pair is not None: + params['symbol'] = trading_pair + return await self.request(f"unsubscribe{channel}", params) + + # listen to messages by method + async def on_message(self) -> AsyncIterable[Any]: + async for msg in self._messages(): + yield msg diff --git a/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml b/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml index ce38789572..ed08e2fa90 100644 --- a/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml +++ b/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml @@ -45,6 +45,9 @@ dolomite_maker_fee_amount: dolomite_taker_fee_amount: +hitbtc_maker_fee: +hitbtc_taker_fee: + loopring_maker_fee: loopring_taker_fee: diff --git a/hummingbot/templates/conf_global_TEMPLATE.yml b/hummingbot/templates/conf_global_TEMPLATE.yml index e66886f105..2d3af846f3 100644 --- a/hummingbot/templates/conf_global_TEMPLATE.yml +++ b/hummingbot/templates/conf_global_TEMPLATE.yml @@ -59,6 +59,9 @@ crypto_com_api_key: null crypto_com_secret_key: null +hitbtc_api_key: null +hitbtc_secret_key: null + bitfinex_api_key: null bitfinex_secret_key: null diff --git a/setup.py b/setup.py index 6cd0dd9002..083859ad4c 100755 --- a/setup.py +++ b/setup.py @@ -57,6 +57,7 @@ def main(): "hummingbot.connector.exchange.eterbase", "hummingbot.connector.exchange.beaxy", "hummingbot.connector.exchange.bitmax", + "hummingbot.connector.exchange.hitbtc", "hummingbot.connector.derivative", "hummingbot.connector.derivative.binance_perpetual", "hummingbot.script", diff --git a/test/connector/exchange/hitbtc/.gitignore b/test/connector/exchange/hitbtc/.gitignore new file mode 100644 index 0000000000..23d9952b8c --- /dev/null +++ b/test/connector/exchange/hitbtc/.gitignore @@ -0,0 +1 @@ +backups \ No newline at end of file diff --git a/test/connector/exchange/hitbtc/__init__.py b/test/connector/exchange/hitbtc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/connector/exchange/hitbtc/test_hitbtc_auth.py b/test/connector/exchange/hitbtc/test_hitbtc_auth.py new file mode 100644 index 0000000000..1943412ea3 --- /dev/null +++ b/test/connector/exchange/hitbtc/test_hitbtc_auth.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +import sys +import asyncio +import unittest +import aiohttp +import conf +import logging +from os.path import join, realpath +from typing import Dict, Any +from hummingbot.connector.exchange.hitbtc.hitbtc_auth import HitbtcAuth +from hummingbot.connector.exchange.hitbtc.hitbtc_websocket import HitbtcWebsocket +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL +from hummingbot.connector.exchange.hitbtc.hitbtc_constants import Constants + +sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +logging.basicConfig(level=METRICS_LOG_LEVEL) + + +class TestAuth(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + api_key = conf.hitbtc_api_key + secret_key = conf.hitbtc_secret_key + cls.auth = HitbtcAuth(api_key, secret_key) + + async def rest_auth(self) -> Dict[Any, Any]: + endpoint = Constants.ENDPOINT['USER_BALANCES'] + headers = self.auth.get_headers("GET", f"{Constants.REST_URL_AUTH}/{endpoint}", None) + http_client = aiohttp.ClientSession() + response = await http_client.get(f"{Constants.REST_URL}/{endpoint}", headers=headers) + await http_client.close() + return await response.json() + + async def ws_auth(self) -> Dict[Any, Any]: + ws = HitbtcWebsocket(self.auth) + await ws.connect() + await ws.subscribe(Constants.WS_SUB["USER_ORDERS_TRADES"], None, {}) + async for response in ws.on_message(): + return response + + def test_rest_auth(self): + result = self.ev_loop.run_until_complete(self.rest_auth()) + if len(result) == 0 or "currency" not in result[0].keys(): + print(f"Unexpected response for API call: {result}") + assert "currency" in result[0].keys() + + def test_ws_auth(self): + response = self.ev_loop.run_until_complete(self.ws_auth()) + if 'result' not in response: + print(f"Unexpected response for API call: {response}") + assert response['result'] is True diff --git a/test/connector/exchange/hitbtc/test_hitbtc_exchange.py b/test/connector/exchange/hitbtc/test_hitbtc_exchange.py new file mode 100644 index 0000000000..0456f5a8a9 --- /dev/null +++ b/test/connector/exchange/hitbtc/test_hitbtc_exchange.py @@ -0,0 +1,438 @@ +from os.path import join, realpath +import sys; sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +import asyncio +import logging +from decimal import Decimal +import unittest +import contextlib +import time +import os +from typing import List +import conf +import math + +from hummingbot.core.clock import Clock, ClockMode +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL +from hummingbot.core.utils.async_utils import safe_gather, safe_ensure_future +from hummingbot.core.event.event_logger import EventLogger +from hummingbot.core.event.events import ( + BuyOrderCompletedEvent, + BuyOrderCreatedEvent, + MarketEvent, + OrderFilledEvent, + OrderType, + SellOrderCompletedEvent, + SellOrderCreatedEvent, + OrderCancelledEvent +) +from hummingbot.model.sql_connection_manager import ( + SQLConnectionManager, + SQLConnectionType +) +from hummingbot.model.market_state import MarketState +from hummingbot.model.order import Order +from hummingbot.model.trade_fill import TradeFill +from hummingbot.connector.markets_recorder import MarketsRecorder +from hummingbot.connector.exchange.hitbtc.hitbtc_exchange import HitbtcExchange + +logging.basicConfig(level=METRICS_LOG_LEVEL) + +API_KEY = conf.hitbtc_api_key +API_SECRET = conf.hitbtc_secret_key + + +class HitbtcExchangeUnitTest(unittest.TestCase): + events: List[MarketEvent] = [ + MarketEvent.BuyOrderCompleted, + MarketEvent.SellOrderCompleted, + MarketEvent.OrderFilled, + MarketEvent.TransactionFailure, + MarketEvent.BuyOrderCreated, + MarketEvent.SellOrderCreated, + MarketEvent.OrderCancelled, + MarketEvent.OrderFailure + ] + connector: HitbtcExchange + event_logger: EventLogger + trading_pair = "BTC-USD" + base_token, quote_token = trading_pair.split("-") + stack: contextlib.ExitStack + + @classmethod + def setUpClass(cls): + global MAINNET_RPC_URL + + cls.ev_loop = asyncio.get_event_loop() + + cls.clock: Clock = Clock(ClockMode.REALTIME) + cls.connector: HitbtcExchange = HitbtcExchange( + hitbtc_api_key=API_KEY, + hitbtc_secret_key=API_SECRET, + trading_pairs=[cls.trading_pair], + trading_required=True + ) + print("Initializing Hitbtc market... this will take about a minute.") + cls.clock.add_iterator(cls.connector) + cls.stack: contextlib.ExitStack = contextlib.ExitStack() + cls._clock = cls.stack.enter_context(cls.clock) + cls.ev_loop.run_until_complete(cls.wait_til_ready()) + print("Ready.") + + @classmethod + def tearDownClass(cls) -> None: + cls.stack.close() + + @classmethod + async def wait_til_ready(cls, connector = None): + if connector is None: + connector = cls.connector + while True: + now = time.time() + next_iteration = now // 1.0 + 1 + if connector.ready: + break + else: + await cls._clock.run_til(next_iteration) + await asyncio.sleep(1.0) + + def setUp(self): + self.db_path: str = realpath(join(__file__, "../connector_test.sqlite")) + try: + os.unlink(self.db_path) + except FileNotFoundError: + pass + + self.event_logger = EventLogger() + for event_tag in self.events: + self.connector.add_listener(event_tag, self.event_logger) + + def tearDown(self): + for event_tag in self.events: + self.connector.remove_listener(event_tag, self.event_logger) + self.event_logger = None + + async def run_parallel_async(self, *tasks): + future: asyncio.Future = safe_ensure_future(safe_gather(*tasks)) + while not future.done(): + now = time.time() + next_iteration = now // 1.0 + 1 + await self._clock.run_til(next_iteration) + await asyncio.sleep(1.0) + return future.result() + + def run_parallel(self, *tasks): + return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks)) + + def _place_order(self, is_buy, amount, order_type, price, ex_order_id) -> str: + if is_buy: + cl_order_id = self.connector.buy(self.trading_pair, amount, order_type, price) + else: + cl_order_id = self.connector.sell(self.trading_pair, amount, order_type, price) + return cl_order_id + + def _cancel_order(self, cl_order_id, connector=None): + if connector is None: + connector = self.connector + return connector.cancel(self.trading_pair, cl_order_id) + + def test_estimate_fee(self): + maker_fee = self.connector.estimate_fee_pct(True) + self.assertAlmostEqual(maker_fee, Decimal("0.001")) + taker_fee = self.connector.estimate_fee_pct(False) + self.assertAlmostEqual(taker_fee, Decimal("0.0025")) + + def test_buy_and_sell(self): + price = self.connector.get_price(self.trading_pair, True) * Decimal("1.02") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + quote_bal = self.connector.get_available_balance(self.quote_token) + base_bal = self.connector.get_available_balance(self.base_token) + + order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1) + order_completed_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCompletedEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(5)) + trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)] + base_amount_traded = sum(t.amount for t in trade_events) + quote_amount_traded = sum(t.amount * t.price for t in trade_events) + + self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events]) + self.assertEqual(order_id, order_completed_event.order_id) + self.assertEqual(amount, order_completed_event.base_asset_amount) + self.assertEqual("BTC", order_completed_event.base_asset) + self.assertEqual("USD", order_completed_event.quote_asset) + self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount) + self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount) + self.assertGreater(order_completed_event.fee_amount, Decimal(0)) + self.assertTrue(any([isinstance(event, BuyOrderCreatedEvent) and str(event.order_id) == str(order_id) + for event in self.event_logger.event_log])) + + # check available quote balance gets updated, we need to wait a bit for the balance message to arrive + expected_quote_bal = quote_bal - quote_amount_traded + # self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 1) + + # Reset the logs + self.event_logger.clear() + + # Try to sell back the same amount to the exchange, and watch for completion event. + price = self.connector.get_price(self.trading_pair, True) * Decimal("0.98") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2) + order_completed_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCompletedEvent)) + trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)] + base_amount_traded = sum(t.amount for t in trade_events) + quote_amount_traded = sum(t.amount * t.price for t in trade_events) + + self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events]) + self.assertEqual(order_id, order_completed_event.order_id) + self.assertEqual(amount, order_completed_event.base_asset_amount) + self.assertEqual("BTC", order_completed_event.base_asset) + self.assertEqual("USD", order_completed_event.quote_asset) + self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount) + self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount) + self.assertGreater(order_completed_event.fee_amount, Decimal(0)) + self.assertTrue(any([isinstance(event, SellOrderCreatedEvent) and event.order_id == order_id + for event in self.event_logger.event_log])) + + # check available base balance gets updated, we need to wait a bit for the balance message to arrive + expected_base_bal = base_bal + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.ev_loop.run_until_complete(asyncio.sleep(5)) + self.assertAlmostEqual(expected_base_bal, self.connector.get_available_balance(self.base_token), 5) + + def test_limit_makers_unfilled(self): + price = self.connector.get_price(self.trading_pair, True) * Decimal("0.8") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.ev_loop.run_until_complete(asyncio.sleep(2)) + quote_bal = self.connector.get_available_balance(self.quote_token) + + cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1) + order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent)) + self.assertEqual(cl_order_id, order_created_event.order_id) + # check available quote balance gets updated, we need to wait a bit for the balance message to arrive + taker_fee = self.connector.estimate_fee_pct(False) + quote_amount = ((price * amount)) + quote_amount = ((price * amount) * (Decimal("1") + taker_fee)) + expected_quote_bal = quote_bal - quote_amount + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.ev_loop.run_until_complete(self.connector._update_balances()) + self.ev_loop.run_until_complete(asyncio.sleep(2)) + + self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 5) + self._cancel_order(cl_order_id) + event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self.assertEqual(cl_order_id, event.order_id) + + price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + + cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2) + order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCreatedEvent)) + self.assertEqual(cl_order_id, order_created_event.order_id) + self._cancel_order(cl_order_id) + event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self.assertEqual(cl_order_id, event.order_id) + + # # @TODO: find a way to create "rejected" + # def test_limit_maker_rejections(self): + # price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2") + # price = self.connector.quantize_order_price(self.trading_pair, price) + # amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000001")) + # cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1) + # event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + # self.assertEqual(cl_order_id, event.order_id) + + # price = self.connector.get_price(self.trading_pair, False) * Decimal("0.8") + # price = self.connector.quantize_order_price(self.trading_pair, price) + # amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000001")) + # cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2) + # event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + # self.assertEqual(cl_order_id, event.order_id) + + def test_cancel_all(self): + bid_price = self.connector.get_price(self.trading_pair, True) + ask_price = self.connector.get_price(self.trading_pair, False) + bid_price = self.connector.quantize_order_price(self.trading_pair, bid_price * Decimal("0.9")) + ask_price = self.connector.quantize_order_price(self.trading_pair, ask_price * Decimal("1.1")) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + + buy_id = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1) + sell_id = self._place_order(False, amount, OrderType.LIMIT, ask_price, 2) + + self.ev_loop.run_until_complete(asyncio.sleep(1)) + asyncio.ensure_future(self.connector.cancel_all(5)) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + cancel_events = [t for t in self.event_logger.event_log if isinstance(t, OrderCancelledEvent)] + self.assertEqual({buy_id, sell_id}, {o.order_id for o in cancel_events}) + + def test_order_quantized_values(self): + bid_price: Decimal = self.connector.get_price(self.trading_pair, True) + ask_price: Decimal = self.connector.get_price(self.trading_pair, False) + mid_price: Decimal = (bid_price + ask_price) / 2 + + # Make sure there's enough balance to make the limit orders. + self.assertGreater(self.connector.get_balance("BTC"), Decimal("0.0005")) + self.assertGreater(self.connector.get_balance("USD"), Decimal("10")) + + # Intentionally set some prices with too many decimal places s.t. they + # need to be quantized. Also, place them far away from the mid-price s.t. they won't + # get filled during the test. + bid_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("0.9333192292111341")) + ask_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("1.1492431474884933")) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000123456")) + + # Test bid order + cl_order_id_1 = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1) + # Wait for the order created event and examine the order made + self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent)) + + # Test ask order + cl_order_id_2 = self._place_order(False, amount, OrderType.LIMIT, ask_price, 1) + # Wait for the order created event and examine and order made + self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCreatedEvent)) + + self._cancel_order(cl_order_id_1) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + self._cancel_order(cl_order_id_2) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + + def test_orders_saving_and_restoration(self): + config_path = "test_config" + strategy_name = "test_strategy" + sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path) + order_id = None + recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name) + recorder.start() + + try: + self.connector._in_flight_orders.clear() + self.assertEqual(0, len(self.connector.tracking_states)) + + # Try to put limit buy order for 0.02 ETH worth of ZRX, and watch for order creation event. + current_bid_price: Decimal = self.connector.get_price(self.trading_pair, True) + price: Decimal = current_bid_price * Decimal("0.8") + price = self.connector.quantize_order_price(self.trading_pair, price) + + amount: Decimal = Decimal("0.0001") + amount = self.connector.quantize_order_amount(self.trading_pair, amount) + + cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1) + order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent)) + self.assertEqual(cl_order_id, order_created_event.order_id) + + # Verify tracking states + self.assertEqual(1, len(self.connector.tracking_states)) + self.assertEqual(cl_order_id, list(self.connector.tracking_states.keys())[0]) + + # Verify orders from recorder + recorded_orders: List[Order] = recorder.get_orders_for_config_and_market(config_path, self.connector) + self.assertEqual(1, len(recorded_orders)) + self.assertEqual(cl_order_id, recorded_orders[0].id) + + # Verify saved market states + saved_market_states: MarketState = recorder.get_market_states(config_path, self.connector) + self.assertIsNotNone(saved_market_states) + self.assertIsInstance(saved_market_states.saved_state, dict) + self.assertGreater(len(saved_market_states.saved_state), 0) + + # Close out the current market and start another market. + self.connector.stop(self._clock) + self.ev_loop.run_until_complete(asyncio.sleep(5)) + self.clock.remove_iterator(self.connector) + for event_tag in self.events: + self.connector.remove_listener(event_tag, self.event_logger) + # Clear the event loop + self.event_logger.clear() + new_connector = HitbtcExchange(API_KEY, API_SECRET, [self.trading_pair], True) + for event_tag in self.events: + new_connector.add_listener(event_tag, self.event_logger) + recorder.stop() + recorder = MarketsRecorder(sql, [new_connector], config_path, strategy_name) + recorder.start() + saved_market_states = recorder.get_market_states(config_path, new_connector) + self.clock.add_iterator(new_connector) + self.ev_loop.run_until_complete(self.wait_til_ready(new_connector)) + self.assertEqual(0, len(new_connector.limit_orders)) + self.assertEqual(0, len(new_connector.tracking_states)) + new_connector.restore_tracking_states(saved_market_states.saved_state) + self.assertEqual(1, len(new_connector.limit_orders)) + self.assertEqual(1, len(new_connector.tracking_states)) + + # Cancel the order and verify that the change is saved. + self._cancel_order(cl_order_id, new_connector) + self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent)) + recorder.save_market_states(config_path, new_connector) + order_id = None + self.assertEqual(0, len(new_connector.limit_orders)) + self.assertEqual(0, len(new_connector.tracking_states)) + saved_market_states = recorder.get_market_states(config_path, new_connector) + self.assertEqual(0, len(saved_market_states.saved_state)) + finally: + if order_id is not None: + self.connector.cancel(self.trading_pair, cl_order_id) + self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent)) + + recorder.stop() + os.unlink(self.db_path) + + def test_update_last_prices(self): + # This is basic test to see if order_book last_trade_price is initiated and updated. + for order_book in self.connector.order_books.values(): + for _ in range(5): + self.ev_loop.run_until_complete(asyncio.sleep(1)) + self.assertFalse(math.isnan(order_book.last_trade_price)) + + def test_filled_orders_recorded(self): + config_path: str = "test_config" + strategy_name: str = "test_strategy" + sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path) + order_id = None + recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name) + recorder.start() + + try: + # Try to buy some token from the exchange, and watch for completion event. + price = self.connector.get_price(self.trading_pair, True) * Decimal("1.05") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + + order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1) + self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCompletedEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + + # Reset the logs + self.event_logger.clear() + + # Try to sell back the same amount to the exchange, and watch for completion event. + price = self.connector.get_price(self.trading_pair, True) * Decimal("0.95") + price = self.connector.quantize_order_price(self.trading_pair, price) + amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001")) + order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2) + self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCompletedEvent)) + self.ev_loop.run_until_complete(asyncio.sleep(1)) + + # Query the persisted trade logs + trade_fills: List[TradeFill] = recorder.get_trades_for_config(config_path) + self.assertGreaterEqual(len(trade_fills), 2) + buy_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "BUY"] + sell_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "SELL"] + self.assertGreaterEqual(len(buy_fills), 1) + self.assertGreaterEqual(len(sell_fills), 1) + + order_id = None + + finally: + if order_id is not None: + self.connector.cancel(self.trading_pair, order_id) + self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent)) + + recorder.stop() + os.unlink(self.db_path) diff --git a/test/connector/exchange/hitbtc/test_hitbtc_order_book_tracker.py b/test/connector/exchange/hitbtc/test_hitbtc_order_book_tracker.py new file mode 100755 index 0000000000..ae3778e7c9 --- /dev/null +++ b/test/connector/exchange/hitbtc/test_hitbtc_order_book_tracker.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +import sys +import math +import time +import asyncio +import logging +import unittest +from os.path import join, realpath +from typing import Dict, Optional, List +from hummingbot.core.event.event_logger import EventLogger +from hummingbot.core.event.events import OrderBookEvent, OrderBookTradeEvent, TradeType +from hummingbot.connector.exchange.hitbtc.hitbtc_order_book_tracker import HitbtcOrderBookTracker +from hummingbot.connector.exchange.hitbtc.hitbtc_api_order_book_data_source import HitbtcAPIOrderBookDataSource +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL + + +sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +logging.basicConfig(level=METRICS_LOG_LEVEL) + + +class HitbtcOrderBookTrackerUnitTest(unittest.TestCase): + order_book_tracker: Optional[HitbtcOrderBookTracker] = None + events: List[OrderBookEvent] = [ + OrderBookEvent.TradeEvent + ] + trading_pairs: List[str] = [ + "BTC-USD", + "ETH-USD", + ] + + @classmethod + def setUpClass(cls): + cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + cls.order_book_tracker: HitbtcOrderBookTracker = HitbtcOrderBookTracker(cls.trading_pairs) + cls.order_book_tracker.start() + cls.ev_loop.run_until_complete(cls.wait_til_tracker_ready()) + + @classmethod + async def wait_til_tracker_ready(cls): + while True: + if len(cls.order_book_tracker.order_books) > 0: + print("Initialized real-time order books.") + return + await asyncio.sleep(1) + + async def run_parallel_async(self, *tasks, timeout=None): + future: asyncio.Future = asyncio.ensure_future(asyncio.gather(*tasks)) + timer = 0 + while not future.done(): + if timeout and timer > timeout: + raise Exception("Timeout running parallel async tasks in tests") + timer += 1 + now = time.time() + _next_iteration = now // 1.0 + 1 # noqa: F841 + await asyncio.sleep(1.0) + return future.result() + + def run_parallel(self, *tasks): + return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks)) + + def setUp(self): + self.event_logger = EventLogger() + for event_tag in self.events: + for trading_pair, order_book in self.order_book_tracker.order_books.items(): + order_book.add_listener(event_tag, self.event_logger) + + def test_order_book_trade_event_emission(self): + """ + Tests if the order book tracker is able to retrieve order book trade message from exchange and emit order book + trade events after correctly parsing the trade messages + """ + self.run_parallel(self.event_logger.wait_for(OrderBookTradeEvent)) + for ob_trade_event in self.event_logger.event_log: + self.assertTrue(type(ob_trade_event) == OrderBookTradeEvent) + self.assertTrue(ob_trade_event.trading_pair in self.trading_pairs) + self.assertTrue(type(ob_trade_event.timestamp) in [float, int]) + self.assertTrue(type(ob_trade_event.amount) == float) + self.assertTrue(type(ob_trade_event.price) == float) + self.assertTrue(type(ob_trade_event.type) == TradeType) + # datetime is in seconds + self.assertTrue(math.ceil(math.log10(ob_trade_event.timestamp)) == 10) + self.assertTrue(ob_trade_event.amount > 0) + self.assertTrue(ob_trade_event.price > 0) + + def test_tracker_integrity(self): + # Wait 5 seconds to process some diffs. + self.ev_loop.run_until_complete(asyncio.sleep(5.0)) + order_books: Dict[str, OrderBook] = self.order_book_tracker.order_books + eth_usd: OrderBook = order_books["ETH-USD"] + self.assertIsNot(eth_usd.last_diff_uid, 0) + self.assertGreaterEqual(eth_usd.get_price_for_volume(True, 10).result_price, + eth_usd.get_price(True)) + self.assertLessEqual(eth_usd.get_price_for_volume(False, 10).result_price, + eth_usd.get_price(False)) + + def test_api_get_last_traded_prices(self): + prices = self.ev_loop.run_until_complete( + HitbtcAPIOrderBookDataSource.get_last_traded_prices(["BTC-USD", "LTC-BTC"])) + for key, value in prices.items(): + print(f"{key} last_trade_price: {value}") + self.assertGreater(prices["BTC-USD"], 1000) + self.assertLess(prices["LTC-BTC"], 1) diff --git a/test/connector/exchange/hitbtc/test_hitbtc_user_stream_tracker.py b/test/connector/exchange/hitbtc/test_hitbtc_user_stream_tracker.py new file mode 100644 index 0000000000..5c82f2372b --- /dev/null +++ b/test/connector/exchange/hitbtc/test_hitbtc_user_stream_tracker.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +import sys +import asyncio +import logging +import unittest +import conf + +from os.path import join, realpath +from hummingbot.connector.exchange.hitbtc.hitbtc_user_stream_tracker import HitbtcUserStreamTracker +from hummingbot.connector.exchange.hitbtc.hitbtc_auth import HitbtcAuth +from hummingbot.core.utils.async_utils import safe_ensure_future +from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL + + +sys.path.insert(0, realpath(join(__file__, "../../../../../"))) +logging.basicConfig(level=METRICS_LOG_LEVEL) + + +class HitbtcUserStreamTrackerUnitTest(unittest.TestCase): + api_key = conf.hitbtc_api_key + api_secret = conf.hitbtc_secret_key + + @classmethod + def setUpClass(cls): + cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + cls.trading_pairs = ["BTC-USD"] + cls.user_stream_tracker: HitbtcUserStreamTracker = HitbtcUserStreamTracker( + hitbtc_auth=HitbtcAuth(cls.api_key, cls.api_secret), + trading_pairs=cls.trading_pairs) + cls.user_stream_tracker_task: asyncio.Task = safe_ensure_future(cls.user_stream_tracker.start()) + + def test_user_stream(self): + # Wait process some msgs. + print("Sleeping for 30s to gather some user stream messages.") + self.ev_loop.run_until_complete(asyncio.sleep(30.0)) + print(self.user_stream_tracker.user_stream)