diff --git a/README.md b/README.md
index f668e3d96f..d3b4b0a8dc 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@ We created hummingbot to promote **decentralized market-making**: enabling membe
| | bitfinex | [Bitfinex](https://www.bitfinex.com/) | 2 | [API](https://docs.bitfinex.com/docs/introduction) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) |
| | blocktane | [Blocktane](https://blocktane.io/) | 2 | [API](https://blocktane.io/api) |![GREEN](https://via.placeholder.com/15/008000/?text=+) |
| | coinbase_pro | [Coinbase Pro](https://pro.coinbase.com/) | * | [API](https://docs.pro.coinbase.com/) |![GREEN](https://via.placeholder.com/15/008000/?text=+) |
+| | coinzoom | [CoinZoom](https://trade.coinzoom.com/landing) | * | [API](https://api-docs.coinzoom.com/) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) |
| | crypto_com | [Crypto.com](https://crypto.com/exchange) | 2 | [API](https://exchange-docs.crypto.com/#introduction) |![YELLOW](https://via.placeholder.com/15/ffff00/?text=+) |
| | dydx | [dy/dx](https://dydx.exchange/) | 1 | [API](https://docs.dydx.exchange/) |![GREEN](https://via.placeholder.com/15/008000/?text=+) |
| | eterbase | [Eterbase](https://www.eterbase.com/) | * | [API](https://developers.eterbase.exchange/?version=latest) |![RED](https://via.placeholder.com/15/f03c15/?text=+) |
diff --git a/assets/coinzoom_logo.svg b/assets/coinzoom_logo.svg
new file mode 100644
index 0000000000..8184f907e7
--- /dev/null
+++ b/assets/coinzoom_logo.svg
@@ -0,0 +1,16 @@
+
diff --git a/conf/__init__.py b/conf/__init__.py
index 7854c51a99..69e87838fa 100644
--- a/conf/__init__.py
+++ b/conf/__init__.py
@@ -108,6 +108,11 @@
hitbtc_api_key = os.getenv("HITBTC_API_KEY")
hitbtc_secret_key = os.getenv("HITBTC_SECRET_KEY")
+# CoinZoom Test
+coinzoom_api_key = os.getenv("COINZOOM_API_KEY")
+coinzoom_secret_key = os.getenv("COINZOOM_SECRET_KEY")
+coinzoom_username = os.getenv("COINZOOM_USERNAME")
+
# Wallet Tests
test_erc20_token_address = os.getenv("TEST_ERC20_TOKEN_ADDRESS")
web3_test_private_key_a = os.getenv("TEST_WALLET_PRIVATE_KEY_A")
diff --git a/hummingbot/connector/connector_status.py b/hummingbot/connector/connector_status.py
index 14a0968094..abe0f7dd2b 100644
--- a/hummingbot/connector/connector_status.py
+++ b/hummingbot/connector/connector_status.py
@@ -13,6 +13,7 @@
'blocktane': 'green',
'celo': 'green',
'coinbase_pro': 'green',
+ 'coinzoom': 'yellow',
'crypto_com': 'yellow',
'dydx': 'green',
'eterbase': 'red',
diff --git a/hummingbot/connector/exchange/coinzoom/__init__.py b/hummingbot/connector/exchange/coinzoom/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pxd b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pxd
new file mode 100644
index 0000000000..881d7862df
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pxd
@@ -0,0 +1,13 @@
+# distutils: language=c++
+cimport numpy as np
+
+cdef class CoinzoomActiveOrderTracker:
+ cdef dict _active_bids
+ cdef dict _active_asks
+ cdef dict _active_asks_ids
+ cdef dict _active_bids_ids
+
+ cdef tuple c_convert_diff_message_to_np_arrays(self, object message)
+ cdef tuple c_convert_snapshot_message_to_np_arrays(self, object message)
+ # This method doesn't seem to be used anywhere at all
+ # cdef np.ndarray[np.float64_t, ndim=1] c_convert_trade_message_to_np_array(self, object message)
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pyx b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pyx
new file mode 100644
index 0000000000..a7e4fcb815
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_active_order_tracker.pyx
@@ -0,0 +1,157 @@
+# distutils: language=c++
+# distutils: sources=hummingbot/core/cpp/OrderBookEntry.cpp
+import logging
+import numpy as np
+from decimal import Decimal
+from typing import Dict
+from hummingbot.logger import HummingbotLogger
+from hummingbot.core.data_type.order_book_row import OrderBookRow
+
+_logger = None
+s_empty_diff = np.ndarray(shape=(0, 4), dtype="float64")
+CoinzoomOrderBookTrackingDictionary = Dict[Decimal, Dict[str, Dict[str, any]]]
+
+cdef class CoinzoomActiveOrderTracker:
+ def __init__(self,
+ active_asks: CoinzoomOrderBookTrackingDictionary = None,
+ active_bids: CoinzoomOrderBookTrackingDictionary = None):
+ super().__init__()
+ self._active_asks = active_asks or {}
+ self._active_bids = active_bids or {}
+ self._active_asks_ids = {}
+ self._active_bids_ids = {}
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ global _logger
+ if _logger is None:
+ _logger = logging.getLogger(__name__)
+ return _logger
+
+ @property
+ def active_asks(self) -> CoinzoomOrderBookTrackingDictionary:
+ return self._active_asks
+
+ @property
+ def active_bids(self) -> CoinzoomOrderBookTrackingDictionary:
+ return self._active_bids
+
+ # TODO: research this more
+ def volume_for_ask_price(self, price) -> float:
+ return NotImplementedError
+
+ # TODO: research this more
+ def volume_for_bid_price(self, price) -> float:
+ return NotImplementedError
+
+ def get_rates_and_quantities(self, entry) -> tuple:
+ # price, quantity
+ return float(entry[0]), float(entry[1])
+
+ def get_rates_and_amts_with_ids(self, entry, id_list) -> tuple:
+ if len(entry) > 1:
+ price = float(entry[1])
+ amount = float(entry[2])
+ id_list[str(entry[0])] = price
+ else:
+ price = id_list.get(str(entry[0]))
+ amount = 0.0
+ return price, amount
+
+ cdef tuple c_convert_diff_message_to_np_arrays(self, object message):
+ cdef:
+ dict content = message.content
+ list content_keys = list(content.keys())
+ list bid_entries = []
+ list ask_entries = []
+ str order_id
+ str order_side
+ str price_raw
+ object price
+ dict order_dict
+ double timestamp = message.timestamp
+ double amount = 0
+ dict nps = {'bids': s_empty_diff, 'asks': s_empty_diff}
+
+ if "b" in content_keys:
+ bid_entries = content["b"]
+ if "s" in content_keys:
+ ask_entries = content["s"]
+
+ for entries, diff_key, id_list in [
+ (bid_entries, 'bids', self._active_bids_ids),
+ (ask_entries, 'asks', self._active_asks_ids)
+ ]:
+ if len(entries) > 0:
+ nps[diff_key] = np.array(
+ [[timestamp, price, amount, message.update_id]
+ for price, amount in [self.get_rates_and_amts_with_ids(entry, id_list) for entry in entries]
+ if price is not None],
+ dtype="float64", ndmin=2
+ )
+ return nps['bids'], nps['asks']
+
+ cdef tuple c_convert_snapshot_message_to_np_arrays(self, object message):
+ cdef:
+ float price
+ float amount
+ str order_id
+ dict order_dict
+
+ # Refresh all order tracking.
+ self._active_bids.clear()
+ self._active_asks.clear()
+ timestamp = message.timestamp
+ content = message.content
+ content_keys = list(content.keys())
+
+ if "bids" in content_keys:
+ for snapshot_orders, active_orders in [(content["bids"], self._active_bids), (content["asks"], self._active_asks)]:
+ for entry in snapshot_orders:
+ price, amount = self.get_rates_and_quantities(entry)
+ active_orders[price] = amount
+ else:
+ for snapshot_orders, active_orders, active_order_ids in [
+ (content["b"], self._active_bids, self._active_bids_ids),
+ (content["s"], self._active_asks, self._active_asks_ids)
+ ]:
+ for entry in snapshot_orders:
+ price, amount = self.get_rates_and_amts_with_ids(entry, active_order_ids)
+ active_orders[price] = amount
+
+ # Return the sorted snapshot tables.
+ cdef:
+ np.ndarray[np.float64_t, ndim=2] bids = np.array(
+ [[message.timestamp, float(price), float(self._active_bids[price]), message.update_id]
+ for price in sorted(self._active_bids.keys())], dtype='float64', ndmin=2)
+ np.ndarray[np.float64_t, ndim=2] asks = np.array(
+ [[message.timestamp, float(price), float(self._active_asks[price]), message.update_id]
+ for price in sorted(self._active_asks.keys(), reverse=True)], dtype='float64', ndmin=2)
+
+ if bids.shape[1] != 4:
+ bids = bids.reshape((0, 4))
+ if asks.shape[1] != 4:
+ asks = asks.reshape((0, 4))
+
+ return bids, asks
+
+ # This method doesn't seem to be used anywhere at all
+ # cdef np.ndarray[np.float64_t, ndim=1] c_convert_trade_message_to_np_array(self, object message):
+ # cdef:
+ # double trade_type_value = 1.0 if message.content[4] == "BUY" else 2.0
+ # list content = message.content
+
+ # return np.array([message.timestamp, trade_type_value, float(content[1]), float(content[2])],
+ # dtype="float64")
+
+ def convert_diff_message_to_order_book_row(self, message):
+ np_bids, np_asks = self.c_convert_diff_message_to_np_arrays(message)
+ bids_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_bids]
+ asks_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_asks]
+ return bids_row, asks_row
+
+ def convert_snapshot_message_to_order_book_row(self, message):
+ np_bids, np_asks = self.c_convert_snapshot_message_to_np_arrays(message)
+ bids_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_bids]
+ asks_row = [OrderBookRow(price, qty, update_id) for ts, price, qty, update_id in np_asks]
+ return bids_row, asks_row
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_api_order_book_data_source.py b/hummingbot/connector/exchange/coinzoom/coinzoom_api_order_book_data_source.py
new file mode 100644
index 0000000000..49032d6f7f
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_api_order_book_data_source.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+import asyncio
+import logging
+import time
+import pandas as pd
+from decimal import Decimal
+from typing import Optional, List, Dict, Any
+from hummingbot.core.data_type.order_book import OrderBook
+from hummingbot.core.data_type.order_book_message import OrderBookMessage
+from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource
+from hummingbot.logger import HummingbotLogger
+from .coinzoom_constants import Constants
+from .coinzoom_active_order_tracker import CoinzoomActiveOrderTracker
+from .coinzoom_order_book import CoinzoomOrderBook
+from .coinzoom_websocket import CoinzoomWebsocket
+from .coinzoom_utils import (
+ convert_to_exchange_trading_pair,
+ convert_from_exchange_trading_pair,
+ api_call_with_retries,
+ CoinzoomAPIError,
+)
+
+
+class CoinzoomAPIOrderBookDataSource(OrderBookTrackerDataSource):
+ _logger: Optional[HummingbotLogger] = None
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ if cls._logger is None:
+ cls._logger = logging.getLogger(__name__)
+ return cls._logger
+
+ def __init__(self, trading_pairs: List[str] = None):
+ super().__init__(trading_pairs)
+ self._trading_pairs: List[str] = trading_pairs
+ self._snapshot_msg: Dict[str, any] = {}
+
+ @classmethod
+ async def get_last_traded_prices(cls, trading_pairs: List[str]) -> Dict[str, Decimal]:
+ results = {}
+ tickers: List[Dict[Any]] = await api_call_with_retries("GET", Constants.ENDPOINT["TICKER"])
+ for trading_pair in trading_pairs:
+ ex_pair: str = convert_to_exchange_trading_pair(trading_pair, True)
+ ticker: Dict[Any] = list([tic for symbol, tic in tickers.items() if symbol == ex_pair])[0]
+ results[trading_pair]: Decimal = Decimal(str(ticker["last_price"]))
+ return results
+
+ @staticmethod
+ async def fetch_trading_pairs() -> List[str]:
+ try:
+ symbols: List[Dict[str, Any]] = await api_call_with_retries("GET", Constants.ENDPOINT["SYMBOL"])
+ trading_pairs: List[str] = list([convert_from_exchange_trading_pair(sym["symbol"]) for sym in symbols])
+ # Filter out unmatched pairs so nothing breaks
+ return [sym for sym in trading_pairs if sym is not None]
+ except Exception:
+ # Do nothing if the request fails -- there will be no autocomplete for CoinZoom trading pairs
+ pass
+ return []
+
+ @staticmethod
+ async def get_order_book_data(trading_pair: str) -> Dict[str, any]:
+ """
+ Get whole orderbook
+ """
+ try:
+ ex_pair = convert_to_exchange_trading_pair(trading_pair, True)
+ ob_endpoint = Constants.ENDPOINT["ORDER_BOOK"].format(trading_pair=ex_pair)
+ orderbook_response: Dict[Any] = await api_call_with_retries("GET", ob_endpoint)
+ return orderbook_response
+ except CoinzoomAPIError as e:
+ err = e.error_payload.get('error', e.error_payload)
+ raise IOError(
+ f"Error fetching OrderBook for {trading_pair} at {Constants.EXCHANGE_NAME}. "
+ f"HTTP status is {e.error_payload['status']}. Error is {err.get('message', str(err))}.")
+
+ async def get_new_order_book(self, trading_pair: str) -> OrderBook:
+ snapshot: Dict[str, Any] = await self.get_order_book_data(trading_pair)
+ snapshot_timestamp: float = float(snapshot['timestamp'])
+ snapshot_msg: OrderBookMessage = CoinzoomOrderBook.snapshot_message_from_exchange(
+ snapshot,
+ snapshot_timestamp,
+ metadata={"trading_pair": trading_pair})
+ order_book = self.order_book_create_function()
+ active_order_tracker: CoinzoomActiveOrderTracker = CoinzoomActiveOrderTracker()
+ bids, asks = active_order_tracker.convert_snapshot_message_to_order_book_row(snapshot_msg)
+ order_book.apply_snapshot(bids, asks, snapshot_msg.update_id)
+ return order_book
+
+ async def listen_for_trades(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue):
+ """
+ Listen for trades using websocket trade channel
+ """
+ while True:
+ try:
+ ws = CoinzoomWebsocket()
+ await ws.connect()
+
+ for pair in self._trading_pairs:
+ await ws.subscribe({Constants.WS_SUB["TRADES"]: {'symbol': convert_to_exchange_trading_pair(pair)}})
+
+ async for response in ws.on_message():
+ msg_keys = list(response.keys()) if response is not None else []
+
+ if not Constants.WS_METHODS["TRADES_UPDATE"] in msg_keys:
+ continue
+
+ trade: List[Any] = response[Constants.WS_METHODS["TRADES_UPDATE"]]
+ trade_msg: OrderBookMessage = CoinzoomOrderBook.trade_message_from_exchange(trade)
+ output.put_nowait(trade_msg)
+
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().error("Unexpected error.", exc_info=True)
+ raise
+ await asyncio.sleep(5.0)
+ finally:
+ await ws.disconnect()
+
+ async def listen_for_order_book_diffs(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue):
+ """
+ Listen for orderbook diffs using websocket book channel
+ """
+ while True:
+ try:
+ ws = CoinzoomWebsocket()
+ await ws.connect()
+
+ order_book_methods = [
+ Constants.WS_METHODS['ORDERS_SNAPSHOT'],
+ Constants.WS_METHODS['ORDERS_UPDATE'],
+ ]
+
+ for pair in self._trading_pairs:
+ ex_pair = convert_to_exchange_trading_pair(pair)
+ ws_stream = {
+ Constants.WS_SUB["ORDERS"]: {
+ 'requestId': ex_pair,
+ 'symbol': ex_pair,
+ 'aggregate': False,
+ 'depth': 0,
+ }
+ }
+ await ws.subscribe(ws_stream)
+
+ async for response in ws.on_message():
+ msg_keys = list(response.keys()) if response is not None else []
+
+ method_key = [key for key in msg_keys if key in order_book_methods]
+
+ if len(method_key) != 1:
+ continue
+
+ method: str = method_key[0]
+ order_book_data: dict = response
+ timestamp: int = int(time.time() * 1e3)
+ pair: str = convert_from_exchange_trading_pair(response[method])
+
+ order_book_msg_cls = (CoinzoomOrderBook.diff_message_from_exchange
+ if method == Constants.WS_METHODS['ORDERS_UPDATE'] else
+ CoinzoomOrderBook.snapshot_message_from_exchange)
+
+ orderbook_msg: OrderBookMessage = order_book_msg_cls(
+ order_book_data,
+ timestamp,
+ metadata={"trading_pair": pair})
+ output.put_nowait(orderbook_msg)
+
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().network(
+ "Unexpected error with WebSocket connection.", exc_info=True,
+ app_warning_msg="Unexpected error with WebSocket connection. Retrying in 30 seconds. "
+ "Check network connection.")
+ await asyncio.sleep(30.0)
+ finally:
+ await ws.disconnect()
+
+ async def listen_for_order_book_snapshots(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue):
+ """
+ Listen for orderbook snapshots by fetching orderbook
+ """
+ while True:
+ try:
+ for trading_pair in self._trading_pairs:
+ try:
+ snapshot: Dict[str, any] = await self.get_order_book_data(trading_pair)
+ snapshot_msg: OrderBookMessage = CoinzoomOrderBook.snapshot_message_from_exchange(
+ snapshot,
+ snapshot['timestamp'],
+ metadata={"trading_pair": trading_pair}
+ )
+ output.put_nowait(snapshot_msg)
+ self.logger().debug(f"Saved order book snapshot for {trading_pair}")
+ # Be careful not to go above API rate limits.
+ await asyncio.sleep(5.0)
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().network(
+ "Unexpected error with WebSocket connection.", exc_info=True,
+ app_warning_msg="Unexpected error with WebSocket connection. Retrying in 5 seconds. "
+ "Check network connection.")
+ await asyncio.sleep(5.0)
+ this_hour: pd.Timestamp = pd.Timestamp.utcnow().replace(minute=0, second=0, microsecond=0)
+ next_hour: pd.Timestamp = this_hour + pd.Timedelta(hours=1)
+ delta: float = next_hour.timestamp() - time.time()
+ await asyncio.sleep(delta)
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().error("Unexpected error.", exc_info=True)
+ await asyncio.sleep(5.0)
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_api_user_stream_data_source.py b/hummingbot/connector/exchange/coinzoom/coinzoom_api_user_stream_data_source.py
new file mode 100755
index 0000000000..7aede77e89
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_api_user_stream_data_source.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+import time
+import asyncio
+import logging
+from typing import (
+ Any,
+ AsyncIterable,
+ List,
+ Optional,
+)
+from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource
+from hummingbot.logger import HummingbotLogger
+from .coinzoom_constants import Constants
+from .coinzoom_auth import CoinzoomAuth
+from .coinzoom_utils import CoinzoomAPIError
+from .coinzoom_websocket import CoinzoomWebsocket
+
+
+class CoinzoomAPIUserStreamDataSource(UserStreamTrackerDataSource):
+
+ _logger: Optional[HummingbotLogger] = None
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ if cls._logger is None:
+ cls._logger = logging.getLogger(__name__)
+ return cls._logger
+
+ def __init__(self, coinzoom_auth: CoinzoomAuth, trading_pairs: Optional[List[str]] = []):
+ self._coinzoom_auth: CoinzoomAuth = coinzoom_auth
+ self._ws: CoinzoomWebsocket = None
+ self._trading_pairs = trading_pairs
+ self._current_listen_key = None
+ self._listen_for_user_stream_task = None
+ self._last_recv_time: float = 0
+ super().__init__()
+
+ @property
+ def last_recv_time(self) -> float:
+ return self._last_recv_time
+
+ async def _ws_request_balances(self):
+ return await self._ws.request(Constants.WS_METHODS["USER_BALANCE"])
+
+ async def _listen_to_orders_trades_balances(self) -> AsyncIterable[Any]:
+ """
+ Subscribe to active orders via web socket
+ """
+
+ try:
+ self._ws = CoinzoomWebsocket(self._coinzoom_auth)
+
+ await self._ws.connect()
+
+ await self._ws.subscribe({Constants.WS_SUB["USER_ORDERS_TRADES"]: {}})
+
+ event_methods = [
+ Constants.WS_METHODS["USER_ORDERS"],
+ # We don't need to know about pending cancels
+ # Constants.WS_METHODS["USER_ORDERS_CANCEL"],
+ ]
+
+ async for msg in self._ws.on_message():
+ self._last_recv_time = time.time()
+
+ msg_keys = list(msg.keys()) if msg is not None else []
+
+ if not any(ws_method in msg_keys for ws_method in event_methods):
+ continue
+ yield msg
+ except Exception as e:
+ raise e
+ finally:
+ await self._ws.disconnect()
+ await asyncio.sleep(5)
+
+ async def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue) -> AsyncIterable[Any]:
+ """
+ *required
+ Subscribe to user stream via web socket, and keep the connection open for incoming messages
+ :param ev_loop: ev_loop to execute this function in
+ :param output: an async queue where the incoming messages are stored
+ """
+
+ while True:
+ try:
+ async for msg in self._listen_to_orders_trades_balances():
+ output.put_nowait(msg)
+ except asyncio.CancelledError:
+ raise
+ except CoinzoomAPIError as e:
+ self.logger().error(e.error_payload.get('error'), exc_info=True)
+ raise
+ except Exception:
+ self.logger().error(
+ f"Unexpected error with {Constants.EXCHANGE_NAME} WebSocket connection. "
+ "Retrying after 30 seconds...", exc_info=True)
+ await asyncio.sleep(30.0)
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_auth.py b/hummingbot/connector/exchange/coinzoom/coinzoom_auth.py
new file mode 100755
index 0000000000..9379f3716b
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_auth.py
@@ -0,0 +1,31 @@
+from typing import Dict, Any
+
+
+class CoinzoomAuth():
+ """
+ Auth class required by CoinZoom API
+ Learn more at https://exchange-docs.crypto.com/#digital-signature
+ """
+ def __init__(self, api_key: str, secret_key: str, username: str):
+ self.api_key = api_key
+ self.secret_key = secret_key
+ self.username = username
+
+ def get_ws_params(self) -> Dict[str, str]:
+ return {
+ "apiKey": str(self.api_key),
+ "secretKey": str(self.secret_key),
+ }
+
+ def get_headers(self) -> Dict[str, Any]:
+ """
+ Generates authentication headers required by CoinZoom
+ :return: a dictionary of auth headers
+ """
+ headers = {
+ "Content-Type": "application/json",
+ "Coinzoom-Api-Key": str(self.api_key),
+ "Coinzoom-Api-Secret": str(self.secret_key),
+ "User-Agent": f"hummingbot ZoomMe: {self.username}"
+ }
+ return headers
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_constants.py b/hummingbot/connector/exchange/coinzoom/coinzoom_constants.py
new file mode 100644
index 0000000000..0cad1cb049
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_constants.py
@@ -0,0 +1,60 @@
+# A single source of truth for constant variables related to the exchange
+class Constants:
+ """
+ API Documentation Links:
+ https://api-docs.coinzoom.com/
+ https://api-markets.coinzoom.com/
+ """
+ EXCHANGE_NAME = "coinzoom"
+ REST_URL = "https://api.coinzoom.com/api/v1/public"
+ # REST_URL = "https://api.stage.coinzoom.com/api/v1/public"
+ WS_PRIVATE_URL = "wss://api.coinzoom.com/api/v1/public/market/data/stream"
+ # WS_PRIVATE_URL = "wss://api.stage.coinzoom.com/api/v1/public/market/data/stream"
+ WS_PUBLIC_URL = "wss://api.coinzoom.com/api/v1/public/market/data/stream"
+ # WS_PUBLIC_URL = "wss://api.stage.coinzoom.com/api/v1/public/market/data/stream"
+
+ HBOT_BROKER_ID = "CZ_API_HBOT"
+
+ ENDPOINT = {
+ # Public Endpoints
+ "TICKER": "marketwatch/ticker",
+ "SYMBOL": "instruments",
+ "ORDER_BOOK": "marketwatch/orderbook/{trading_pair}/150/2",
+ "ORDER_CREATE": "orders/new",
+ "ORDER_DELETE": "orders/cancel",
+ "ORDER_STATUS": "orders/list",
+ "USER_ORDERS": "orders/list",
+ "USER_BALANCES": "ledger/list",
+ }
+
+ WS_SUB = {
+ "TRADES": "TradeSummaryRequest",
+ "ORDERS": "OrderBookRequest",
+ "USER_ORDERS_TRADES": "OrderUpdateRequest",
+
+ }
+
+ WS_METHODS = {
+ "ORDERS_SNAPSHOT": "ob",
+ "ORDERS_UPDATE": "oi",
+ "TRADES_UPDATE": "ts",
+ "USER_BALANCE": "getTradingBalance",
+ "USER_ORDERS": "OrderResponse",
+ "USER_ORDERS_CANCEL": "OrderCancelResponse",
+ }
+
+ # Timeouts
+ MESSAGE_TIMEOUT = 30.0
+ PING_TIMEOUT = 10.0
+ API_CALL_TIMEOUT = 10.0
+ API_MAX_RETRIES = 4
+
+ # Intervals
+ # Only used when nothing is received from WS
+ SHORT_POLL_INTERVAL = 5.0
+ # One minute should be fine since we request balance updates on order updates
+ LONG_POLL_INTERVAL = 60.0
+ # One minute should be fine for order status since we get these via WS
+ UPDATE_ORDER_STATUS_INTERVAL = 60.0
+ # 10 minute interval to update trading rules, these would likely never change whilst running.
+ INTERVAL_TRADING_RULES = 600
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_exchange.py b/hummingbot/connector/exchange/coinzoom/coinzoom_exchange.py
new file mode 100644
index 0000000000..65108d7475
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_exchange.py
@@ -0,0 +1,919 @@
+import logging
+from typing import (
+ Dict,
+ List,
+ Optional,
+ Any,
+ AsyncIterable,
+)
+from decimal import Decimal
+import asyncio
+import aiohttp
+import math
+import time
+import ujson
+from async_timeout import timeout
+
+from hummingbot.core.network_iterator import NetworkStatus
+from hummingbot.logger import HummingbotLogger
+from hummingbot.core.clock import Clock
+from hummingbot.core.utils.asyncio_throttle import Throttler
+from hummingbot.core.utils.async_utils import safe_ensure_future, safe_gather
+from hummingbot.connector.trading_rule import TradingRule
+from hummingbot.core.data_type.cancellation_result import CancellationResult
+from hummingbot.core.data_type.order_book import OrderBook
+from hummingbot.core.data_type.limit_order import LimitOrder
+from hummingbot.core.event.events import (
+ MarketEvent,
+ BuyOrderCompletedEvent,
+ SellOrderCompletedEvent,
+ OrderFilledEvent,
+ OrderCancelledEvent,
+ BuyOrderCreatedEvent,
+ SellOrderCreatedEvent,
+ MarketOrderFailureEvent,
+ OrderType,
+ TradeType,
+ TradeFee
+)
+from hummingbot.connector.exchange_base import ExchangeBase
+from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_tracker import CoinzoomOrderBookTracker
+from hummingbot.connector.exchange.coinzoom.coinzoom_user_stream_tracker import CoinzoomUserStreamTracker
+from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth
+from hummingbot.connector.exchange.coinzoom.coinzoom_in_flight_order import CoinzoomInFlightOrder
+from hummingbot.connector.exchange.coinzoom.coinzoom_utils import (
+ convert_from_exchange_trading_pair,
+ convert_to_exchange_trading_pair,
+ get_new_client_order_id,
+ aiohttp_response_with_errors,
+ retry_sleep_time,
+ str_date_to_ts,
+ CoinzoomAPIError,
+)
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+from hummingbot.core.data_type.common import OpenOrder
+ctce_logger = None
+s_decimal_NaN = Decimal("nan")
+
+
+class CoinzoomExchange(ExchangeBase):
+ """
+ CoinzoomExchange connects with CoinZoom exchange and provides order book pricing, user account tracking and
+ trading functionality.
+ """
+ ORDER_NOT_EXIST_CONFIRMATION_COUNT = 3
+ ORDER_NOT_EXIST_CANCEL_COUNT = 2
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ global ctce_logger
+ if ctce_logger is None:
+ ctce_logger = logging.getLogger(__name__)
+ return ctce_logger
+
+ def __init__(self,
+ coinzoom_api_key: str,
+ coinzoom_secret_key: str,
+ coinzoom_username: str,
+ trading_pairs: Optional[List[str]] = None,
+ trading_required: bool = True
+ ):
+ """
+ :param coinzoom_api_key: The API key to connect to private CoinZoom APIs.
+ :param coinzoom_secret_key: The API secret.
+ :param coinzoom_username: The ZoomMe Username.
+ :param trading_pairs: The market trading pairs which to track order book data.
+ :param trading_required: Whether actual trading is needed.
+ """
+ super().__init__()
+ self._trading_required = trading_required
+ self._trading_pairs = trading_pairs
+ self._coinzoom_auth = CoinzoomAuth(coinzoom_api_key, coinzoom_secret_key, coinzoom_username)
+ self._order_book_tracker = CoinzoomOrderBookTracker(trading_pairs=trading_pairs)
+ self._user_stream_tracker = CoinzoomUserStreamTracker(self._coinzoom_auth, trading_pairs)
+ self._ev_loop = asyncio.get_event_loop()
+ self._shared_client = None
+ self._poll_notifier = asyncio.Event()
+ self._last_timestamp = 0
+ self._in_flight_orders = {} # Dict[client_order_id:str, CoinzoomInFlightOrder]
+ self._order_not_found_records = {} # Dict[client_order_id:str, count:int]
+ self._trading_rules = {} # Dict[trading_pair:str, TradingRule]
+ self._status_polling_task = None
+ self._user_stream_event_listener_task = None
+ self._trading_rules_polling_task = None
+ self._last_poll_timestamp = 0
+ self._throttler = Throttler(rate_limit = (8.0, 6))
+
+ @property
+ def name(self) -> str:
+ return "coinzoom"
+
+ @property
+ def order_books(self) -> Dict[str, OrderBook]:
+ return self._order_book_tracker.order_books
+
+ @property
+ def trading_rules(self) -> Dict[str, TradingRule]:
+ return self._trading_rules
+
+ @property
+ def in_flight_orders(self) -> Dict[str, CoinzoomInFlightOrder]:
+ return self._in_flight_orders
+
+ @property
+ def status_dict(self) -> Dict[str, bool]:
+ """
+ A dictionary of statuses of various connector's components.
+ """
+ return {
+ "order_books_initialized": self._order_book_tracker.ready,
+ "account_balance": len(self._account_balances) > 0 if self._trading_required else True,
+ "trading_rule_initialized": len(self._trading_rules) > 0,
+ "user_stream_initialized":
+ self._user_stream_tracker.data_source.last_recv_time > 0 if self._trading_required else True,
+ }
+
+ @property
+ def ready(self) -> bool:
+ """
+ :return True when all statuses pass, this might take 5-10 seconds for all the connector's components and
+ services to be ready.
+ """
+ return all(self.status_dict.values())
+
+ @property
+ def limit_orders(self) -> List[LimitOrder]:
+ return [
+ in_flight_order.to_limit_order()
+ for in_flight_order in self._in_flight_orders.values()
+ ]
+
+ @property
+ def tracking_states(self) -> Dict[str, any]:
+ """
+ :return active in-flight orders in json format, is used to save in sqlite db.
+ """
+ return {
+ key: value.to_json()
+ for key, value in self._in_flight_orders.items()
+ if not value.is_done
+ }
+
+ def restore_tracking_states(self, saved_states: Dict[str, any]):
+ """
+ Restore in-flight orders from saved tracking states, this is st the connector can pick up on where it left off
+ when it disconnects.
+ :param saved_states: The saved tracking_states.
+ """
+ self._in_flight_orders.update({
+ key: CoinzoomInFlightOrder.from_json(value)
+ for key, value in saved_states.items()
+ })
+
+ def supported_order_types(self) -> List[OrderType]:
+ """
+ :return a list of OrderType supported by this connector.
+ Note that Market order type is no longer required and will not be used.
+ """
+ return [OrderType.LIMIT, OrderType.LIMIT_MAKER]
+
+ def start(self, clock: Clock, timestamp: float):
+ """
+ This function is called automatically by the clock.
+ """
+ super().start(clock, timestamp)
+
+ def stop(self, clock: Clock):
+ """
+ This function is called automatically by the clock.
+ """
+ super().stop(clock)
+
+ async def start_network(self):
+ """
+ This function is required by NetworkIterator base class and is called automatically.
+ It starts tracking order book, polling trading rules,
+ updating statuses and tracking user data.
+ """
+ self._order_book_tracker.start()
+ self._trading_rules_polling_task = safe_ensure_future(self._trading_rules_polling_loop())
+ if self._trading_required:
+ self._status_polling_task = safe_ensure_future(self._status_polling_loop())
+ self._user_stream_tracker_task = safe_ensure_future(self._user_stream_tracker.start())
+ self._user_stream_event_listener_task = safe_ensure_future(self._user_stream_event_listener())
+
+ async def stop_network(self):
+ """
+ This function is required by NetworkIterator base class and is called automatically.
+ """
+ self._order_book_tracker.stop()
+ if self._status_polling_task is not None:
+ self._status_polling_task.cancel()
+ self._status_polling_task = None
+ if self._trading_rules_polling_task is not None:
+ self._trading_rules_polling_task.cancel()
+ self._trading_rules_polling_task = None
+ if self._status_polling_task is not None:
+ self._status_polling_task.cancel()
+ self._status_polling_task = None
+ if self._user_stream_tracker_task is not None:
+ self._user_stream_tracker_task.cancel()
+ self._user_stream_tracker_task = None
+ if self._user_stream_event_listener_task is not None:
+ self._user_stream_event_listener_task.cancel()
+ self._user_stream_event_listener_task = None
+
+ async def check_network(self) -> NetworkStatus:
+ """
+ This function is required by NetworkIterator base class and is called periodically to check
+ the network connection. Simply ping the network (or call any light weight public API).
+ """
+ try:
+ # since there is no ping endpoint, the lowest rate call is to get BTC-USD symbol
+ await self._api_request("GET", Constants.ENDPOINT['SYMBOL'])
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ return NetworkStatus.NOT_CONNECTED
+ return NetworkStatus.CONNECTED
+
+ async def _http_client(self) -> aiohttp.ClientSession:
+ """
+ :returns Shared client session instance
+ """
+ if self._shared_client is None:
+ self._shared_client = aiohttp.ClientSession()
+ return self._shared_client
+
+ async def _trading_rules_polling_loop(self):
+ """
+ Periodically update trading rule.
+ """
+ while True:
+ try:
+ await self._update_trading_rules()
+ await asyncio.sleep(Constants.INTERVAL_TRADING_RULES)
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ self.logger().network(f"Unexpected error while fetching trading rules. Error: {str(e)}",
+ exc_info=True,
+ app_warning_msg=("Could not fetch new trading rules from "
+ f"{Constants.EXCHANGE_NAME}. Check network connection."))
+ await asyncio.sleep(0.5)
+
+ async def _update_trading_rules(self):
+ symbols_info = await self._api_request("GET", endpoint=Constants.ENDPOINT['SYMBOL'])
+ self._trading_rules.clear()
+ self._trading_rules = self._format_trading_rules(symbols_info)
+
+ def _format_trading_rules(self, symbols_info: Dict[str, Any]) -> Dict[str, TradingRule]:
+ """
+ Converts json API response into a dictionary of trading rules.
+ :param symbols_info: The json API response
+ :return A dictionary of trading rules.
+ Response Example:
+ [
+ {
+ "symbol" : "BTC/USD",
+ "baseCurrencyCode" : "BTC",
+ "termCurrencyCode" : "USD",
+ "minTradeAmt" : 0.0001,
+ "maxTradeAmt" : 10,
+ "maxPricePrecision" : 2,
+ "maxQuantityPrecision" : 6,
+ "issueOnly" : false
+ }
+ ]
+ """
+ result = {}
+ for rule in symbols_info:
+ try:
+ trading_pair = convert_from_exchange_trading_pair(rule["symbol"])
+ min_amount = Decimal(str(rule["minTradeAmt"]))
+ min_price = Decimal(f"1e-{rule['maxPricePrecision']}")
+ result[trading_pair] = TradingRule(trading_pair,
+ min_order_size=min_amount,
+ max_order_size=Decimal(str(rule["maxTradeAmt"])),
+ min_price_increment=min_price,
+ min_base_amount_increment=min_amount,
+ min_notional_size=min(min_price * min_amount, Decimal("0.00000001")),
+ max_price_significant_digits=Decimal(str(rule["maxPricePrecision"])),
+ )
+ except Exception:
+ self.logger().error(f"Error parsing the trading pair rule {rule}. Skipping.", exc_info=True)
+ return result
+
+ async def _api_request(self,
+ method: str,
+ endpoint: str,
+ params: Optional[Dict[str, Any]] = None,
+ is_auth_required: bool = False,
+ try_count: int = 0) -> Dict[str, Any]:
+ """
+ Sends an aiohttp request and waits for a response.
+ :param method: The HTTP method, e.g. get or post
+ :param endpoint: The path url or the API end point
+ :param params: Additional get/post parameters
+ :param is_auth_required: Whether an authentication is required, when True the function will add encrypted
+ signature to the request.
+ :returns A response in json format.
+ """
+ async with self._throttler.weighted_task(request_weight=1):
+ url = f"{Constants.REST_URL}/{endpoint}"
+ shared_client = await self._http_client()
+ # Turn `params` into either GET params or POST body data
+ qs_params: dict = params if method.upper() == "GET" else None
+ req_params = ujson.dumps(params) if method.upper() == "POST" and params is not None else None
+ # Generate auth headers if needed.
+ headers: dict = {"Content-Type": "application/json", "User-Agent": "hummingbot"}
+ if is_auth_required:
+ headers: dict = self._coinzoom_auth.get_headers()
+ # Build request coro
+ response_coro = shared_client.request(method=method.upper(), url=url, headers=headers,
+ params=qs_params, data=req_params,
+ timeout=Constants.API_CALL_TIMEOUT)
+ http_status, parsed_response, request_errors = await aiohttp_response_with_errors(response_coro)
+ if request_errors or parsed_response is None:
+ if try_count < Constants.API_MAX_RETRIES:
+ try_count += 1
+ time_sleep = retry_sleep_time(try_count)
+ self.logger().info(f"Error fetching data from {url}. HTTP status is {http_status}. "
+ f"Retrying in {time_sleep:.0f}s.")
+ await asyncio.sleep(time_sleep)
+ return await self._api_request(method=method, endpoint=endpoint, params=params,
+ is_auth_required=is_auth_required, try_count=try_count)
+ else:
+ raise CoinzoomAPIError({"error": parsed_response, "status": http_status})
+ if "error" in parsed_response:
+ raise CoinzoomAPIError(parsed_response)
+ return parsed_response
+
+ def get_order_price_quantum(self, trading_pair: str, price: Decimal):
+ """
+ Returns a price step, a minimum price increment for a given trading pair.
+ """
+ trading_rule = self._trading_rules[trading_pair]
+ return trading_rule.min_price_increment
+
+ def get_order_size_quantum(self, trading_pair: str, order_size: Decimal):
+ """
+ Returns an order amount step, a minimum amount increment for a given trading pair.
+ """
+ trading_rule = self._trading_rules[trading_pair]
+ return Decimal(trading_rule.min_base_amount_increment)
+
+ def get_order_book(self, trading_pair: str) -> OrderBook:
+ if trading_pair not in self._order_book_tracker.order_books:
+ raise ValueError(f"No order book exists for '{trading_pair}'.")
+ return self._order_book_tracker.order_books[trading_pair]
+
+ def buy(self, trading_pair: str, amount: Decimal, order_type=OrderType.MARKET,
+ price: Decimal = s_decimal_NaN, **kwargs) -> str:
+ """
+ Buys an amount of base asset (of the given trading pair). This function returns immediately.
+ To see an actual order, you'll have to wait for BuyOrderCreatedEvent.
+ :param trading_pair: The market (e.g. BTC-USDT) to buy from
+ :param amount: The amount in base token value
+ :param order_type: The order type
+ :param price: The price (note: this is no longer optional)
+ :returns A new internal order id
+ """
+ order_id: str = get_new_client_order_id(True, trading_pair)
+ safe_ensure_future(self._create_order(TradeType.BUY, order_id, trading_pair, amount, order_type, price))
+ return order_id
+
+ def sell(self, trading_pair: str, amount: Decimal, order_type=OrderType.MARKET,
+ price: Decimal = s_decimal_NaN, **kwargs) -> str:
+ """
+ Sells an amount of base asset (of the given trading pair). This function returns immediately.
+ To see an actual order, you'll have to wait for SellOrderCreatedEvent.
+ :param trading_pair: The market (e.g. BTC-USDT) to sell from
+ :param amount: The amount in base token value
+ :param order_type: The order type
+ :param price: The price (note: this is no longer optional)
+ :returns A new internal order id
+ """
+ order_id: str = get_new_client_order_id(False, trading_pair)
+ safe_ensure_future(self._create_order(TradeType.SELL, order_id, trading_pair, amount, order_type, price))
+ return order_id
+
+ def cancel(self, trading_pair: str, order_id: str):
+ """
+ Cancel an order. This function returns immediately.
+ To get the cancellation result, you'll have to wait for OrderCancelledEvent.
+ :param trading_pair: The market (e.g. BTC-USDT) of the order.
+ :param order_id: The internal order id (also called client_order_id)
+ """
+ safe_ensure_future(self._execute_cancel(trading_pair, order_id))
+ return order_id
+
+ async def _create_order(self,
+ trade_type: TradeType,
+ order_id: str,
+ trading_pair: str,
+ amount: Decimal,
+ order_type: OrderType,
+ price: Decimal):
+ """
+ Calls create-order API end point to place an order, starts tracking the order and triggers order created event.
+ :param trade_type: BUY or SELL
+ :param order_id: Internal order id (also called client_order_id)
+ :param trading_pair: The market to place order
+ :param amount: The order amount (in base token value)
+ :param order_type: The order type
+ :param price: The order price
+ """
+ if not order_type.is_limit_type():
+ raise Exception(f"Unsupported order type: {order_type}")
+ trading_rule = self._trading_rules[trading_pair]
+
+ amount = self.quantize_order_amount(trading_pair, amount)
+ price = self.quantize_order_price(trading_pair, price)
+ if amount < trading_rule.min_order_size:
+ raise ValueError(f"Buy order amount {amount} is lower than the minimum order size "
+ f"{trading_rule.min_order_size}.")
+ order_type_str = order_type.name.upper().split("_")[0]
+ api_params = {"symbol": convert_to_exchange_trading_pair(trading_pair),
+ "orderType": order_type_str,
+ "orderSide": trade_type.name.upper(),
+ "quantity": f"{amount:f}",
+ "price": f"{price:f}",
+ "originType": Constants.HBOT_BROKER_ID,
+ # CoinZoom doesn't support client order id yet
+ # "clientOrderId": order_id,
+ "payFeesWithZoomToken": "true",
+ }
+ self.start_tracking_order(order_id, None, trading_pair, trade_type, price, amount, order_type)
+ try:
+ order_result = await self._api_request("POST", Constants.ENDPOINT["ORDER_CREATE"], api_params, True)
+ exchange_order_id = str(order_result)
+ tracked_order = self._in_flight_orders.get(order_id)
+ if tracked_order is not None:
+ self.logger().info(f"Created {order_type.name} {trade_type.name} order {order_id} for "
+ f"{amount} {trading_pair}.")
+ tracked_order.update_exchange_order_id(exchange_order_id)
+ if trade_type is TradeType.BUY:
+ event_tag = MarketEvent.BuyOrderCreated
+ event_cls = BuyOrderCreatedEvent
+ else:
+ event_tag = MarketEvent.SellOrderCreated
+ event_cls = SellOrderCreatedEvent
+ self.trigger_event(event_tag,
+ event_cls(self.current_timestamp, order_type, trading_pair, amount, price, order_id))
+ except asyncio.CancelledError:
+ raise
+ except CoinzoomAPIError as e:
+ error_reason = e.error_payload.get('error', {}).get('message')
+ self.stop_tracking_order(order_id)
+ self.logger().network(
+ f"Error submitting {trade_type.name} {order_type.name} order to {Constants.EXCHANGE_NAME} for "
+ f"{amount} {trading_pair} {price} - {error_reason}.",
+ exc_info=True,
+ app_warning_msg=(f"Error submitting order to {Constants.EXCHANGE_NAME} - {error_reason}.")
+ )
+ self.trigger_event(MarketEvent.OrderFailure,
+ MarketOrderFailureEvent(self.current_timestamp, order_id, order_type))
+
+ def start_tracking_order(self,
+ order_id: str,
+ exchange_order_id: str,
+ trading_pair: str,
+ trade_type: TradeType,
+ price: Decimal,
+ amount: Decimal,
+ order_type: OrderType):
+ """
+ Starts tracking an order by simply adding it into _in_flight_orders dictionary.
+ """
+ self._in_flight_orders[order_id] = CoinzoomInFlightOrder(
+ client_order_id=order_id,
+ exchange_order_id=exchange_order_id,
+ trading_pair=trading_pair,
+ order_type=order_type,
+ trade_type=trade_type,
+ price=price,
+ amount=amount
+ )
+
+ def stop_tracking_order(self, order_id: str):
+ """
+ Stops tracking an order by simply removing it from _in_flight_orders dictionary.
+ """
+ if order_id in self._in_flight_orders:
+ del self._in_flight_orders[order_id]
+ if order_id in self._order_not_found_records:
+ del self._order_not_found_records[order_id]
+
+ async def _execute_cancel(self, trading_pair: str, order_id: str) -> str:
+ """
+ Executes order cancellation process by first calling cancel-order API. The API result doesn't confirm whether
+ the cancellation is successful, it simply states it receives the request.
+ :param trading_pair: The market trading pair (Unused during cancel on CoinZoom)
+ :param order_id: The internal order id
+ order.last_state to change to CANCELED
+ """
+ order_was_cancelled = False
+ try:
+ tracked_order = self._in_flight_orders.get(order_id)
+ if tracked_order is None:
+ raise ValueError(f"Failed to cancel order - {order_id}. Order not found.")
+ if tracked_order.exchange_order_id is None:
+ await tracked_order.get_exchange_order_id()
+ ex_order_id = tracked_order.exchange_order_id
+ api_params = {
+ "orderId": ex_order_id,
+ "symbol": convert_to_exchange_trading_pair(trading_pair)
+ }
+ await self._api_request("POST",
+ Constants.ENDPOINT["ORDER_DELETE"],
+ api_params,
+ is_auth_required=True)
+ order_was_cancelled = True
+ except asyncio.CancelledError:
+ raise
+ except CoinzoomAPIError as e:
+ err = e.error_payload.get('error', e.error_payload)
+ self.logger().error(f"Order Cancel API Error: {err}")
+ # CoinZoom doesn't report any error if the order wasn't found so we can only handle API failures here.
+ self._order_not_found_records[order_id] = self._order_not_found_records.get(order_id, 0) + 1
+ if self._order_not_found_records[order_id] >= self.ORDER_NOT_EXIST_CANCEL_COUNT:
+ order_was_cancelled = True
+ if order_was_cancelled:
+ self.logger().info(f"Successfully cancelled order {order_id} on {Constants.EXCHANGE_NAME}.")
+ self.stop_tracking_order(order_id)
+ self.trigger_event(MarketEvent.OrderCancelled,
+ OrderCancelledEvent(self.current_timestamp, order_id))
+ tracked_order.cancelled_event.set()
+ return CancellationResult(order_id, True)
+ else:
+ self.logger().network(
+ f"Failed to cancel order {order_id}: {err.get('message', str(err))}",
+ exc_info=True,
+ app_warning_msg=f"Failed to cancel the order {order_id} on {Constants.EXCHANGE_NAME}. "
+ f"Check API key and network connection."
+ )
+ return CancellationResult(order_id, False)
+
+ async def _status_polling_loop(self):
+ """
+ Periodically update user balances and order status via REST API. This serves as a fallback measure for web
+ socket API updates.
+ """
+ while True:
+ try:
+ self._poll_notifier = asyncio.Event()
+ await self._poll_notifier.wait()
+ await safe_gather(
+ self._update_balances(),
+ self._update_order_status(),
+ )
+ self._last_poll_timestamp = self.current_timestamp
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ self.logger().error(str(e), exc_info=True)
+ warn_msg = (f"Could not fetch account updates from {Constants.EXCHANGE_NAME}. "
+ "Check API key and network connection.")
+ self.logger().network("Unexpected error while fetching account updates.", exc_info=True,
+ app_warning_msg=warn_msg)
+ await asyncio.sleep(0.5)
+
+ async def _update_balances(self):
+ """
+ Calls REST API to update total and available balances.
+ """
+ account_info = await self._api_request("GET", Constants.ENDPOINT["USER_BALANCES"], is_auth_required=True)
+ self._process_balance_message(account_info)
+
+ async def _update_order_status(self):
+ """
+ Calls REST API to get status update for each in-flight order.
+ """
+ last_tick = int(self._last_poll_timestamp / Constants.UPDATE_ORDER_STATUS_INTERVAL)
+ current_tick = int(self.current_timestamp / Constants.UPDATE_ORDER_STATUS_INTERVAL)
+
+ if current_tick > last_tick and len(self._in_flight_orders) > 0:
+ tracked_orders = list(self._in_flight_orders.values())
+ api_params = {
+ 'symbol': None,
+ 'orderSide': None,
+ 'orderStatuses': ["NEW", "PARTIALLY_FILLED"],
+ 'size': 500,
+ 'bookmarkOrderId': None
+ }
+ self.logger().debug(f"Polling for order status updates of {len(tracked_orders)} orders.")
+ open_orders = await self._api_request("POST",
+ Constants.ENDPOINT["ORDER_STATUS"],
+ api_params,
+ is_auth_required=True)
+
+ open_orders_dict = {o['id']: o for o in open_orders}
+ found_ex_order_ids = list(open_orders_dict.keys())
+
+ for tracked_order in tracked_orders:
+ client_order_id = tracked_order.client_order_id
+ ex_order_id = tracked_order.exchange_order_id
+ if ex_order_id not in found_ex_order_ids:
+ self._order_not_found_records[client_order_id] = \
+ self._order_not_found_records.get(client_order_id, 0) + 1
+ if self._order_not_found_records[client_order_id] < self.ORDER_NOT_EXIST_CONFIRMATION_COUNT:
+ # Wait until the order is not found a few times before actually treating it as failed.
+ continue
+ self.trigger_event(MarketEvent.OrderFailure,
+ MarketOrderFailureEvent(
+ self.current_timestamp, client_order_id, tracked_order.order_type))
+ self.stop_tracking_order(client_order_id)
+ else:
+ self._process_order_message(open_orders_dict[ex_order_id])
+
+ def _process_order_message(self, order_msg: Dict[str, Any]):
+ """
+ Updates in-flight order and triggers cancellation or failure event if needed.
+ :param order_msg: The order response from either REST or web socket API (they are of the same format)
+ Example Orders:
+ REST request
+ {
+ "id" : "977f82aa-23dc-4c8b-982c-2ee7d2002882",
+ "clientOrderId" : null,
+ "symbol" : "BTC/USD",
+ "orderType" : "LIMIT",
+ "orderSide" : "BUY",
+ "quantity" : 0.1,
+ "price" : 54570,
+ "payFeesWithZoomToken" : false,
+ "orderStatus" : "PARTIALLY_FILLED",
+ "timestamp" : "2021-03-24T04:07:26.260253Z",
+ "executions" :
+ [
+ {
+ "id" : "38761582-2b37-4e27-a561-434981d21a96",
+ "executionType" : "PARTIAL_FILL",
+ "orderStatus" : "PARTIALLY_FILLED",
+ "lastPrice" : 54570,
+ "averagePrice" : 54570,
+ "lastQuantity" : 0.01,
+ "leavesQuantity" : 0.09,
+ "cumulativeQuantity" : 0.01,
+ "rejectReason" : null,
+ "timestamp" : "2021-03-24T04:07:44.503222Z"
+ }
+ ]
+ }
+ WS request
+ {
+ 'orderId': '962a2a54-fbcf-4d89-8f37-a8854020a823',
+ 'symbol': 'BTC/USD', 'orderType': 'LIMIT',
+ 'orderSide': 'BUY',
+ 'price': 5000,
+ 'quantity': 0.001,
+ 'executionType': 'CANCEL',
+ 'orderStatus': 'CANCELLED',
+ 'lastQuantity': 0,
+ 'leavesQuantity': 0,
+ 'cumulativeQuantity': 0,
+ 'transactTime': '2021-03-23T19:06:51.155520Z'
+
+ ... Optional fields
+
+ 'id': '4eb3f26c-91bd-4bd2-bacb-15b2f432c452',
+ "orderType": "LIMIT",
+ "lastPrice": 56518.7,
+ "averagePrice": 56518.7,
+ }
+ """
+ # Looks like CoinZoom might support clientOrderId eventually so leaving this here for now.
+ # if order_msg.get('clientOrderId') is not None:
+ # client_order_id = order_msg["clientOrderId"]
+ # if client_order_id not in self._in_flight_orders:
+ # return
+ # tracked_order = self._in_flight_orders[client_order_id]
+ # else:
+ if "orderId" not in order_msg:
+ exchange_order_id = str(order_msg["id"])
+ else:
+ exchange_order_id = str(order_msg["orderId"])
+ tracked_orders = list(self._in_flight_orders.values())
+ track_order = [o for o in tracked_orders if exchange_order_id == o.exchange_order_id]
+ if not track_order:
+ return
+ tracked_order = track_order[0]
+
+ # Estimate fee
+ order_msg["trade_fee"] = self.estimate_fee_pct(tracked_order.order_type is OrderType.LIMIT_MAKER)
+ updated = tracked_order.update_with_order_update(order_msg)
+ # Call Update balances on every message to catch order create, fill and cancel.
+ safe_ensure_future(self._update_balances())
+
+ if updated:
+ safe_ensure_future(self._trigger_order_fill(tracked_order, order_msg))
+ elif tracked_order.is_cancelled:
+ self.logger().info(f"Successfully cancelled order {tracked_order.client_order_id}.")
+ self.stop_tracking_order(tracked_order.client_order_id)
+ self.trigger_event(MarketEvent.OrderCancelled,
+ OrderCancelledEvent(self.current_timestamp, tracked_order.client_order_id))
+ tracked_order.cancelled_event.set()
+ elif tracked_order.is_failure:
+ self.logger().info(f"The order {tracked_order.client_order_id} has failed according to order status API. ")
+ self.trigger_event(MarketEvent.OrderFailure,
+ MarketOrderFailureEvent(
+ self.current_timestamp, tracked_order.client_order_id, tracked_order.order_type))
+ self.stop_tracking_order(tracked_order.client_order_id)
+
+ async def _trigger_order_fill(self,
+ tracked_order: CoinzoomInFlightOrder,
+ update_msg: Dict[str, Any]):
+ self.trigger_event(
+ MarketEvent.OrderFilled,
+ OrderFilledEvent(
+ self.current_timestamp,
+ tracked_order.client_order_id,
+ tracked_order.trading_pair,
+ tracked_order.trade_type,
+ tracked_order.order_type,
+ Decimal(str(update_msg.get("averagePrice", update_msg.get("price", "0")))),
+ tracked_order.executed_amount_base,
+ TradeFee(percent=update_msg["trade_fee"]),
+ update_msg.get("exchange_trade_id", update_msg.get("id", update_msg.get("orderId")))
+ )
+ )
+ if math.isclose(tracked_order.executed_amount_base, tracked_order.amount) or \
+ tracked_order.executed_amount_base >= tracked_order.amount or \
+ tracked_order.is_done:
+ tracked_order.last_state = "FILLED"
+ self.logger().info(f"The {tracked_order.trade_type.name} order "
+ f"{tracked_order.client_order_id} has completed "
+ f"according to order status API.")
+ event_tag = MarketEvent.BuyOrderCompleted if tracked_order.trade_type is TradeType.BUY \
+ else MarketEvent.SellOrderCompleted
+ event_class = BuyOrderCompletedEvent if tracked_order.trade_type is TradeType.BUY \
+ else SellOrderCompletedEvent
+ await asyncio.sleep(0.1)
+ self.trigger_event(event_tag,
+ event_class(self.current_timestamp,
+ tracked_order.client_order_id,
+ tracked_order.base_asset,
+ tracked_order.quote_asset,
+ tracked_order.fee_asset,
+ tracked_order.executed_amount_base,
+ tracked_order.executed_amount_quote,
+ tracked_order.fee_paid,
+ tracked_order.order_type))
+ self.stop_tracking_order(tracked_order.client_order_id)
+
+ def _process_balance_message(self, balance_update):
+ local_asset_names = set(self._account_balances.keys())
+ remote_asset_names = set()
+ for account in balance_update:
+ asset_name = account["currency"]
+ total_bal = Decimal(str(account["totalBalance"]))
+ self._account_available_balances[asset_name] = total_bal + Decimal(str(account["reservedBalance"]))
+ self._account_balances[asset_name] = total_bal
+ remote_asset_names.add(asset_name)
+
+ asset_names_to_remove = local_asset_names.difference(remote_asset_names)
+ for asset_name in asset_names_to_remove:
+ del self._account_available_balances[asset_name]
+ del self._account_balances[asset_name]
+
+ async def cancel_all(self, timeout_seconds: float) -> List[CancellationResult]:
+ """
+ Cancels all in-flight orders and waits for cancellation results.
+ Used by bot's top level stop and exit commands (cancelling outstanding orders on exit)
+ :param timeout_seconds: The timeout at which the operation will be canceled.
+ :returns List of CancellationResult which indicates whether each order is successfully cancelled.
+ """
+ # if self._trading_pairs is None:
+ # raise Exception("cancel_all can only be used when trading_pairs are specified.")
+ open_orders = [o for o in self._in_flight_orders.values() if not o.is_done]
+ if len(open_orders) == 0:
+ return []
+ tasks = [self._execute_cancel(o.trading_pair, o.client_order_id) for o in open_orders]
+ cancellation_results = []
+ try:
+ async with timeout(timeout_seconds):
+ cancellation_results = await safe_gather(*tasks, return_exceptions=False)
+ except Exception:
+ self.logger().network(
+ "Unexpected error cancelling orders.", exc_info=True,
+ app_warning_msg=(f"Failed to cancel all orders on {Constants.EXCHANGE_NAME}. "
+ "Check API key and network connection.")
+ )
+ return cancellation_results
+
+ def tick(self, timestamp: float):
+ """
+ Is called automatically by the clock for each clock's tick (1 second by default).
+ It checks if status polling task is due for execution.
+ """
+ now = time.time()
+ poll_interval = (Constants.SHORT_POLL_INTERVAL
+ if now - self._user_stream_tracker.last_recv_time > 60.0
+ else Constants.LONG_POLL_INTERVAL)
+ last_tick = int(self._last_timestamp / poll_interval)
+ current_tick = int(timestamp / poll_interval)
+ if current_tick > last_tick:
+ if not self._poll_notifier.is_set():
+ self._poll_notifier.set()
+ self._last_timestamp = timestamp
+
+ def get_fee(self,
+ base_currency: str,
+ quote_currency: str,
+ order_type: OrderType,
+ order_side: TradeType,
+ amount: Decimal,
+ price: Decimal = s_decimal_NaN) -> TradeFee:
+ """
+ To get trading fee, this function is simplified by using fee override configuration. Most parameters to this
+ function are ignore except order_type. Use OrderType.LIMIT_MAKER to specify you want trading fee for
+ maker order.
+ """
+ is_maker = order_type is OrderType.LIMIT_MAKER
+ return TradeFee(percent=self.estimate_fee_pct(is_maker))
+
+ async def _iter_user_event_queue(self) -> AsyncIterable[Dict[str, any]]:
+ while True:
+ try:
+ yield await self._user_stream_tracker.user_stream.get()
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().network(
+ "Unknown error. Retrying after 1 seconds.", exc_info=True,
+ app_warning_msg=(f"Could not fetch user events from {Constants.EXCHANGE_NAME}. "
+ "Check API key and network connection."))
+ await asyncio.sleep(1.0)
+
+ async def _user_stream_event_listener(self):
+ """
+ Listens to message in _user_stream_tracker.user_stream queue. The messages are put in by
+ CoinzoomAPIUserStreamDataSource.
+ """
+ async for event_message in self._iter_user_event_queue():
+ try:
+ event_methods = [
+ Constants.WS_METHODS["USER_ORDERS"],
+ Constants.WS_METHODS["USER_ORDERS_CANCEL"],
+ ]
+
+ msg_keys = list(event_message.keys()) if event_message is not None else []
+
+ method_key = [key for key in msg_keys if key in event_methods]
+
+ if len(method_key) != 1:
+ continue
+
+ method: str = method_key[0]
+
+ if method == Constants.WS_METHODS["USER_ORDERS"]:
+ self._process_order_message(event_message[method])
+ elif method == Constants.WS_METHODS["USER_ORDERS_CANCEL"]:
+ self._process_order_message(event_message[method])
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().error("Unexpected error in user stream listener loop.", exc_info=True)
+ await asyncio.sleep(5.0)
+
+ # This is currently unused, but looks like a future addition.
+ async def get_open_orders(self) -> List[OpenOrder]:
+ tracked_orders = list(self._in_flight_orders.values())
+ api_params = {
+ 'symbol': None,
+ 'orderSide': None,
+ 'orderStatuses': ["NEW", "PARTIALLY_FILLED"],
+ 'size': 500,
+ 'bookmarkOrderId': None
+ }
+ result = await self._api_request("POST", Constants.ENDPOINT["USER_ORDERS"], api_params, is_auth_required=True)
+ ret_val = []
+ for order in result:
+ exchange_order_id = str(order["id"])
+ # CoinZoom doesn't support client order ids yet so we must find it from the tracked orders.
+ track_order = [o for o in tracked_orders if exchange_order_id == o.exchange_order_id]
+ if not track_order or len(track_order) < 1:
+ # Skip untracked orders
+ continue
+ client_order_id = track_order[0].client_order_id
+ # if Constants.HBOT_BROKER_ID not in order["clientOrderId"]:
+ # continue
+ if order["orderType"] != OrderType.LIMIT.name.upper():
+ self.logger().info(f"Unsupported order type found: {order['type']}")
+ # Skip and report non-limit orders
+ continue
+ ret_val.append(
+ OpenOrder(
+ client_order_id=client_order_id,
+ trading_pair=convert_from_exchange_trading_pair(order["symbol"]),
+ price=Decimal(str(order["price"])),
+ amount=Decimal(str(order["quantity"])),
+ executed_amount=Decimal(str(order["cumQuantity"])),
+ status=order["orderStatus"],
+ order_type=OrderType.LIMIT,
+ is_buy=True if order["orderSide"].lower() == TradeType.BUY.name.lower() else False,
+ time=str_date_to_ts(order["timestamp"]),
+ exchange_order_id=order["id"]
+ )
+ )
+ return ret_val
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_in_flight_order.py b/hummingbot/connector/exchange/coinzoom/coinzoom_in_flight_order.py
new file mode 100644
index 0000000000..61da8fdb0b
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_in_flight_order.py
@@ -0,0 +1,163 @@
+from decimal import Decimal
+from typing import (
+ Any,
+ Dict,
+ Optional,
+)
+import asyncio
+from hummingbot.core.event.events import (
+ OrderType,
+ TradeType
+)
+from hummingbot.connector.in_flight_order_base import InFlightOrderBase
+
+s_decimal_0 = Decimal(0)
+
+
+class CoinzoomInFlightOrder(InFlightOrderBase):
+ def __init__(self,
+ client_order_id: str,
+ exchange_order_id: Optional[str],
+ trading_pair: str,
+ order_type: OrderType,
+ trade_type: TradeType,
+ price: Decimal,
+ amount: Decimal,
+ initial_state: str = "NEW"):
+ super().__init__(
+ client_order_id,
+ exchange_order_id,
+ trading_pair,
+ order_type,
+ trade_type,
+ price,
+ amount,
+ initial_state,
+ )
+ self.trade_id_set = set()
+ self.cancelled_event = asyncio.Event()
+
+ @property
+ def is_done(self) -> bool:
+ return self.last_state in {"FILLED", "CANCELLED", "REJECTED"}
+
+ @property
+ def is_failure(self) -> bool:
+ return self.last_state in {"REJECTED"}
+
+ @property
+ def is_cancelled(self) -> bool:
+ return self.last_state in {"CANCELLED"}
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Any]) -> InFlightOrderBase:
+ """
+ :param data: json data from API
+ :return: formatted InFlightOrder
+ """
+ retval = CoinzoomInFlightOrder(
+ data["client_order_id"],
+ data["exchange_order_id"],
+ data["trading_pair"],
+ getattr(OrderType, data["order_type"]),
+ getattr(TradeType, data["trade_type"]),
+ Decimal(data["price"]),
+ Decimal(data["amount"]),
+ data["last_state"]
+ )
+ retval.executed_amount_base = Decimal(data["executed_amount_base"])
+ retval.executed_amount_quote = Decimal(data["executed_amount_quote"])
+ retval.fee_asset = data["fee_asset"]
+ retval.fee_paid = Decimal(data["fee_paid"])
+ retval.last_state = data["last_state"]
+ return retval
+
+ def update_with_order_update(self, order_update: Dict[str, Any]) -> bool:
+ """
+ Updates the in flight order with order update (from private/get-order-detail end point)
+ return: True if the order gets updated otherwise False
+ Example Orders:
+ REST request
+ {
+ "id" : "977f82aa-23dc-4c8b-982c-2ee7d2002882",
+ "clientOrderId" : null,
+ "symbol" : "BTC/USD",
+ "orderType" : "LIMIT",
+ "orderSide" : "BUY",
+ "quantity" : 0.1,
+ "price" : 54570,
+ "payFeesWithZoomToken" : false,
+ "orderStatus" : "PARTIALLY_FILLED",
+ "timestamp" : "2021-03-24T04:07:26.260253Z",
+ "executions" :
+ [
+ {
+ "id" : "38761582-2b37-4e27-a561-434981d21a96",
+ "executionType" : "PARTIAL_FILL",
+ "orderStatus" : "PARTIALLY_FILLED",
+ "lastPrice" : 54570,
+ "averagePrice" : 54570,
+ "lastQuantity" : 0.01,
+ "leavesQuantity" : 0.09,
+ "cumulativeQuantity" : 0.01,
+ "rejectReason" : null,
+ "timestamp" : "2021-03-24T04:07:44.503222Z"
+ }
+ ]
+ }
+ WS request
+ {
+ 'id': '4eb3f26c-91bd-4bd2-bacb-15b2f432c452',
+ 'orderId': '962a2a54-fbcf-4d89-8f37-a8854020a823',
+ 'symbol': 'BTC/USD', 'orderType': 'LIMIT',
+ 'orderSide': 'BUY',
+ 'price': 5000,
+ 'quantity': 0.001,
+ 'executionType': 'CANCEL',
+ 'orderStatus': 'CANCELLED',
+ 'lastQuantity': 0,
+ 'leavesQuantity': 0,
+ 'cumulativeQuantity': 0,
+ 'transactTime': '2021-03-23T19:06:51.155520Z'
+ }
+ """
+ # Update order execution status
+ self.last_state = order_update["orderStatus"]
+
+ if 'cumulativeQuantity' not in order_update and 'executions' not in order_update:
+ return False
+
+ trades = order_update.get('executions')
+ if trades is not None:
+ new_trades = False
+ for trade in trades:
+ trade_id = str(trade["timestamp"])
+ if trade_id not in self.trade_id_set:
+ self.trade_id_set.add(trade_id)
+ order_update["exchange_trade_id"] = trade.get("id")
+ # Add executed amounts
+ executed_price = Decimal(str(trade.get("lastPrice", "0")))
+ self.executed_amount_base += Decimal(str(trade["lastQuantity"]))
+ self.executed_amount_quote += executed_price * self.executed_amount_base
+ # Set new trades flag
+ new_trades = True
+ if not new_trades:
+ # trades already recorded
+ return False
+ else:
+ trade_id = str(order_update["transactTime"])
+ if trade_id in self.trade_id_set:
+ # trade already recorded
+ return False
+ self.trade_id_set.add(trade_id)
+ # Set executed amounts
+ executed_price = Decimal(str(order_update.get("averagePrice", order_update.get("price", "0"))))
+ self.executed_amount_base = Decimal(str(order_update["cumulativeQuantity"]))
+ self.executed_amount_quote = executed_price * self.executed_amount_base
+ if self.executed_amount_base <= s_decimal_0:
+ # No trades executed yet.
+ return False
+ self.fee_paid += order_update.get("trade_fee") * self.executed_amount_base
+ if not self.fee_asset:
+ self.fee_asset = self.quote_asset
+ return True
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book.py
new file mode 100644
index 0000000000..e771d48cf3
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python
+
+import logging
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+
+from sqlalchemy.engine import RowProxy
+from typing import (
+ Optional,
+ Dict,
+ List, Any)
+from hummingbot.logger import HummingbotLogger
+from hummingbot.core.data_type.order_book import OrderBook
+from hummingbot.core.data_type.order_book_message import (
+ OrderBookMessage, OrderBookMessageType
+)
+from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_message import CoinzoomOrderBookMessage
+from .coinzoom_utils import (
+ convert_from_exchange_trading_pair,
+ str_date_to_ts,
+)
+
+_logger = None
+
+
+class CoinzoomOrderBook(OrderBook):
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ global _logger
+ if _logger is None:
+ _logger = logging.getLogger(__name__)
+ return _logger
+
+ @classmethod
+ def snapshot_message_from_exchange(cls,
+ msg: Dict[str, any],
+ timestamp: float,
+ metadata: Optional[Dict] = None):
+ """
+ Convert json snapshot data into standard OrderBookMessage format
+ :param msg: json snapshot data from live web socket stream
+ :param timestamp: timestamp attached to incoming data
+ :return: CoinzoomOrderBookMessage
+ """
+
+ if metadata:
+ msg.update(metadata)
+
+ return CoinzoomOrderBookMessage(
+ message_type=OrderBookMessageType.SNAPSHOT,
+ content=msg,
+ timestamp=timestamp
+ )
+
+ @classmethod
+ def snapshot_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None):
+ """
+ *used for backtesting
+ Convert a row of snapshot data into standard OrderBookMessage format
+ :param record: a row of snapshot data from the database
+ :return: CoinzoomOrderBookMessage
+ """
+ return CoinzoomOrderBookMessage(
+ message_type=OrderBookMessageType.SNAPSHOT,
+ content=record.json,
+ timestamp=record.timestamp
+ )
+
+ @classmethod
+ def diff_message_from_exchange(cls,
+ msg: Dict[str, any],
+ timestamp: Optional[float] = None,
+ metadata: Optional[Dict] = None):
+ """
+ Convert json diff data into standard OrderBookMessage format
+ :param msg: json diff data from live web socket stream
+ :param timestamp: timestamp attached to incoming data
+ :return: CoinzoomOrderBookMessage
+ """
+
+ if metadata:
+ msg.update(metadata)
+
+ return CoinzoomOrderBookMessage(
+ message_type=OrderBookMessageType.DIFF,
+ content=msg,
+ timestamp=timestamp
+ )
+
+ @classmethod
+ def diff_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None):
+ """
+ *used for backtesting
+ Convert a row of diff data into standard OrderBookMessage format
+ :param record: a row of diff data from the database
+ :return: CoinzoomOrderBookMessage
+ """
+ return CoinzoomOrderBookMessage(
+ message_type=OrderBookMessageType.DIFF,
+ content=record.json,
+ timestamp=record.timestamp
+ )
+
+ @classmethod
+ def trade_message_from_exchange(cls,
+ msg: Dict[str, Any],
+ timestamp: Optional[float] = None,
+ metadata: Optional[Dict] = None):
+ """
+ Convert a trade data into standard OrderBookMessage format
+ :param record: a trade data from the database
+ :return: CoinzoomOrderBookMessage
+ """
+
+ trade_msg = {
+ "trade_type": msg[4],
+ "price": msg[1],
+ "amount": msg[2],
+ "trading_pair": convert_from_exchange_trading_pair(msg[0])
+ }
+ trade_timestamp = str_date_to_ts(msg[3])
+
+ return CoinzoomOrderBookMessage(
+ message_type=OrderBookMessageType.TRADE,
+ content=trade_msg,
+ timestamp=trade_timestamp
+ )
+
+ @classmethod
+ def trade_message_from_db(cls, record: RowProxy, metadata: Optional[Dict] = None):
+ """
+ *used for backtesting
+ Convert a row of trade data into standard OrderBookMessage format
+ :param record: a row of trade data from the database
+ :return: CoinzoomOrderBookMessage
+ """
+ return CoinzoomOrderBookMessage(
+ message_type=OrderBookMessageType.TRADE,
+ content=record.json,
+ timestamp=record.timestamp
+ )
+
+ @classmethod
+ def from_snapshot(cls, snapshot: OrderBookMessage):
+ raise NotImplementedError(Constants.EXCHANGE_NAME + " order book needs to retain individual order data.")
+
+ @classmethod
+ def restore_from_snapshot_and_diffs(self, snapshot: OrderBookMessage, diffs: List[OrderBookMessage]):
+ raise NotImplementedError(Constants.EXCHANGE_NAME + " order book needs to retain individual order data.")
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_message.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_message.py
new file mode 100644
index 0000000000..d6bc00541d
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_message.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+from typing import (
+ Dict,
+ Optional,
+)
+
+from hummingbot.core.data_type.order_book_message import (
+ OrderBookMessage,
+ OrderBookMessageType,
+)
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+
+
+class CoinzoomOrderBookMessage(OrderBookMessage):
+ def __new__(
+ cls,
+ message_type: OrderBookMessageType,
+ content: Dict[str, any],
+ timestamp: Optional[float] = None,
+ *args,
+ **kwargs,
+ ):
+ if timestamp is None:
+ if message_type is OrderBookMessageType.SNAPSHOT:
+ raise ValueError("timestamp must not be None when initializing snapshot messages.")
+ timestamp = content["timestamp"]
+
+ return super(CoinzoomOrderBookMessage, cls).__new__(
+ cls, message_type, content, timestamp=timestamp, *args, **kwargs
+ )
+
+ @property
+ def update_id(self) -> int:
+ if self.type in [OrderBookMessageType.DIFF, OrderBookMessageType.SNAPSHOT]:
+ return self.timestamp
+ else:
+ return -1
+
+ @property
+ def trade_id(self) -> int:
+ if self.type is OrderBookMessageType.TRADE:
+ return self.timestamp
+ return -1
+
+ @property
+ def trading_pair(self) -> str:
+ return self.content["trading_pair"]
+
+ # The `asks` and `bids` properties are only used in the methods below.
+ # They are all replaced or unused in this connector:
+ # OrderBook.restore_from_snapshot_and_diffs
+ # OrderBookTracker._track_single_book
+ # MockAPIOrderBookDataSource.get_tracking_pairs
+ @property
+ def asks(self):
+ raise NotImplementedError(Constants.EXCHANGE_NAME + " order book uses active_order_tracker.")
+
+ @property
+ def bids(self):
+ raise NotImplementedError(Constants.EXCHANGE_NAME + " order book uses active_order_tracker.")
+
+ def __eq__(self, other) -> bool:
+ return self.type == other.type and self.timestamp == other.timestamp
+
+ def __lt__(self, other) -> bool:
+ if self.timestamp != other.timestamp:
+ return self.timestamp < other.timestamp
+ else:
+ """
+ If timestamp is the same, the ordering is snapshot < diff < trade
+ """
+ return self.type.value < other.type.value
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker.py
new file mode 100644
index 0000000000..c81ca9a7bd
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+import asyncio
+import bisect
+import logging
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+import time
+
+from collections import defaultdict, deque
+from typing import Optional, Dict, List, Deque
+from hummingbot.core.data_type.order_book_message import OrderBookMessageType
+from hummingbot.logger import HummingbotLogger
+from hummingbot.core.data_type.order_book_tracker import OrderBookTracker
+from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_message import CoinzoomOrderBookMessage
+from hummingbot.connector.exchange.coinzoom.coinzoom_active_order_tracker import CoinzoomActiveOrderTracker
+from hummingbot.connector.exchange.coinzoom.coinzoom_api_order_book_data_source import CoinzoomAPIOrderBookDataSource
+from hummingbot.connector.exchange.coinzoom.coinzoom_order_book import CoinzoomOrderBook
+
+
+class CoinzoomOrderBookTracker(OrderBookTracker):
+ _logger: Optional[HummingbotLogger] = None
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ if cls._logger is None:
+ cls._logger = logging.getLogger(__name__)
+ return cls._logger
+
+ def __init__(self, trading_pairs: Optional[List[str]] = None,):
+ super().__init__(CoinzoomAPIOrderBookDataSource(trading_pairs), trading_pairs)
+
+ self._ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
+ self._order_book_snapshot_stream: asyncio.Queue = asyncio.Queue()
+ self._order_book_diff_stream: asyncio.Queue = asyncio.Queue()
+ self._order_book_trade_stream: asyncio.Queue = asyncio.Queue()
+ self._process_msg_deque_task: Optional[asyncio.Task] = None
+ self._past_diffs_windows: Dict[str, Deque] = {}
+ self._order_books: Dict[str, CoinzoomOrderBook] = {}
+ self._saved_message_queues: Dict[str, Deque[CoinzoomOrderBookMessage]] = \
+ defaultdict(lambda: deque(maxlen=1000))
+ self._active_order_trackers: Dict[str, CoinzoomActiveOrderTracker] = defaultdict(CoinzoomActiveOrderTracker)
+ self._order_book_stream_listener_task: Optional[asyncio.Task] = None
+ self._order_book_trade_listener_task: Optional[asyncio.Task] = None
+
+ @property
+ def exchange_name(self) -> str:
+ """
+ Name of the current exchange
+ """
+ return Constants.EXCHANGE_NAME
+
+ async def _track_single_book(self, trading_pair: str):
+ """
+ Update an order book with changes from the latest batch of received messages
+ """
+ past_diffs_window: Deque[CoinzoomOrderBookMessage] = deque()
+ self._past_diffs_windows[trading_pair] = past_diffs_window
+
+ message_queue: asyncio.Queue = self._tracking_message_queues[trading_pair]
+ order_book: CoinzoomOrderBook = self._order_books[trading_pair]
+ active_order_tracker: CoinzoomActiveOrderTracker = self._active_order_trackers[trading_pair]
+
+ last_message_timestamp: float = time.time()
+ diff_messages_accepted: int = 0
+
+ while True:
+ try:
+ message: CoinzoomOrderBookMessage = None
+ saved_messages: Deque[CoinzoomOrderBookMessage] = self._saved_message_queues[trading_pair]
+ # Process saved messages first if there are any
+ if len(saved_messages) > 0:
+ message = saved_messages.popleft()
+ else:
+ message = await message_queue.get()
+
+ if message.type is OrderBookMessageType.DIFF:
+ bids, asks = active_order_tracker.convert_diff_message_to_order_book_row(message)
+ order_book.apply_diffs(bids, asks, message.update_id)
+ past_diffs_window.append(message)
+ while len(past_diffs_window) > self.PAST_DIFF_WINDOW_SIZE:
+ past_diffs_window.popleft()
+ diff_messages_accepted += 1
+
+ # Output some statistics periodically.
+ now: float = time.time()
+ if int(now / 60.0) > int(last_message_timestamp / 60.0):
+ self.logger().debug(f"Processed {diff_messages_accepted} order book diffs for {trading_pair}.")
+ diff_messages_accepted = 0
+ last_message_timestamp = now
+ elif message.type is OrderBookMessageType.SNAPSHOT:
+ past_diffs: List[CoinzoomOrderBookMessage] = list(past_diffs_window)
+ # only replay diffs later than snapshot, first update active order with snapshot then replay diffs
+ replay_position = bisect.bisect_right(past_diffs, message)
+ replay_diffs = past_diffs[replay_position:]
+ s_bids, s_asks = active_order_tracker.convert_snapshot_message_to_order_book_row(message)
+ order_book.apply_snapshot(s_bids, s_asks, message.update_id)
+ for diff_message in replay_diffs:
+ d_bids, d_asks = active_order_tracker.convert_diff_message_to_order_book_row(diff_message)
+ order_book.apply_diffs(d_bids, d_asks, diff_message.update_id)
+
+ self.logger().debug(f"Processed order book snapshot for {trading_pair}.")
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ self.logger().network(
+ f"Unexpected error processing order book messages for {trading_pair}.",
+ exc_info=True,
+ app_warning_msg="Unexpected error processing order book messages. Retrying after 5 seconds."
+ )
+ await asyncio.sleep(5.0)
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker_entry.py b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker_entry.py
new file mode 100644
index 0000000000..94feda5275
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_order_book_tracker_entry.py
@@ -0,0 +1,21 @@
+from hummingbot.core.data_type.order_book import OrderBook
+from hummingbot.core.data_type.order_book_tracker_entry import OrderBookTrackerEntry
+from hummingbot.connector.exchange.coinzoom.coinzoom_active_order_tracker import CoinzoomActiveOrderTracker
+
+
+class CoinzoomOrderBookTrackerEntry(OrderBookTrackerEntry):
+ def __init__(
+ self, trading_pair: str, timestamp: float, order_book: OrderBook, active_order_tracker: CoinzoomActiveOrderTracker
+ ):
+ self._active_order_tracker = active_order_tracker
+ super(CoinzoomOrderBookTrackerEntry, self).__init__(trading_pair, timestamp, order_book)
+
+ def __repr__(self) -> str:
+ return (
+ f"CoinzoomOrderBookTrackerEntry(trading_pair='{self._trading_pair}', timestamp='{self._timestamp}', "
+ f"order_book='{self._order_book}')"
+ )
+
+ @property
+ def active_order_tracker(self) -> CoinzoomActiveOrderTracker:
+ return self._active_order_tracker
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_user_stream_tracker.py b/hummingbot/connector/exchange/coinzoom/coinzoom_user_stream_tracker.py
new file mode 100644
index 0000000000..79c7584d70
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_user_stream_tracker.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+import asyncio
+import logging
+from typing import (
+ Optional,
+ List,
+)
+from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource
+from hummingbot.logger import HummingbotLogger
+from hummingbot.core.data_type.user_stream_tracker import (
+ UserStreamTracker
+)
+from hummingbot.core.utils.async_utils import (
+ safe_ensure_future,
+ safe_gather,
+)
+from hummingbot.connector.exchange.coinzoom.coinzoom_api_user_stream_data_source import \
+ CoinzoomAPIUserStreamDataSource
+from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+
+
+class CoinzoomUserStreamTracker(UserStreamTracker):
+ _cbpust_logger: Optional[HummingbotLogger] = None
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ if cls._bust_logger is None:
+ cls._bust_logger = logging.getLogger(__name__)
+ return cls._bust_logger
+
+ def __init__(self,
+ coinzoom_auth: Optional[CoinzoomAuth] = None,
+ trading_pairs: Optional[List[str]] = []):
+ super().__init__()
+ self._coinzoom_auth: CoinzoomAuth = coinzoom_auth
+ self._trading_pairs: List[str] = trading_pairs
+ self._ev_loop: asyncio.events.AbstractEventLoop = asyncio.get_event_loop()
+ self._data_source: Optional[UserStreamTrackerDataSource] = None
+ self._user_stream_tracking_task: Optional[asyncio.Task] = None
+
+ @property
+ def data_source(self) -> UserStreamTrackerDataSource:
+ """
+ *required
+ Initializes a user stream data source (user specific order diffs from live socket stream)
+ :return: OrderBookTrackerDataSource
+ """
+ if not self._data_source:
+ self._data_source = CoinzoomAPIUserStreamDataSource(
+ coinzoom_auth=self._coinzoom_auth,
+ trading_pairs=self._trading_pairs
+ )
+ return self._data_source
+
+ @property
+ def exchange_name(self) -> str:
+ """
+ *required
+ Name of the current exchange
+ """
+ return Constants.EXCHANGE_NAME
+
+ async def start(self):
+ """
+ *required
+ Start all listeners and tasks
+ """
+ self._user_stream_tracking_task = safe_ensure_future(
+ self.data_source.listen_for_user_stream(self._ev_loop, self._user_stream)
+ )
+ await safe_gather(self._user_stream_tracking_task)
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_utils.py b/hummingbot/connector/exchange/coinzoom/coinzoom_utils.py
new file mode 100644
index 0000000000..498ed56541
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_utils.py
@@ -0,0 +1,149 @@
+import aiohttp
+import asyncio
+import random
+from dateutil.parser import parse as dateparse
+from typing import (
+ Any,
+ Dict,
+ Optional,
+)
+
+from hummingbot.core.utils.tracking_nonce import get_tracking_nonce
+from hummingbot.client.config.config_var import ConfigVar
+from hummingbot.client.config.config_methods import using_exchange
+from .coinzoom_constants import Constants
+
+
+CENTRALIZED = True
+
+EXAMPLE_PAIR = "BTC-USD"
+
+DEFAULT_FEES = [0.2, 0.26]
+
+
+class CoinzoomAPIError(IOError):
+ def __init__(self, error_payload: Dict[str, Any]):
+ super().__init__(str(error_payload))
+ self.error_payload = error_payload
+
+
+# convert date string to timestamp
+def str_date_to_ts(date: str) -> int:
+ return int(dateparse(date).timestamp() * 1e3)
+
+
+# Request ID class
+class RequestId:
+ """
+ Generate request ids
+ """
+ _request_id: int = 0
+
+ @classmethod
+ def generate_request_id(cls) -> int:
+ return get_tracking_nonce()
+
+
+def convert_from_exchange_trading_pair(ex_trading_pair: str) -> Optional[str]:
+ # CoinZoom uses uppercase (BTC/USDT)
+ return ex_trading_pair.replace("/", "-")
+
+
+def convert_to_exchange_trading_pair(hb_trading_pair: str, alternative: bool = False) -> str:
+ # CoinZoom uses uppercase (BTCUSDT)
+ if alternative:
+ return hb_trading_pair.replace("-", "_").upper()
+ else:
+ return hb_trading_pair.replace("-", "/").upper()
+
+
+def get_new_client_order_id(is_buy: bool, trading_pair: str) -> str:
+ side = "B" if is_buy else "S"
+ symbols = trading_pair.split("-")
+ base = symbols[0].upper()
+ quote = symbols[1].upper()
+ base_str = f"{base[0]}{base[-1]}"
+ quote_str = f"{quote[0]}{quote[-1]}"
+ return f"{Constants.HBOT_BROKER_ID}{side}{base_str}{quote_str}{get_tracking_nonce()}"
+
+
+def retry_sleep_time(try_count: int) -> float:
+ random.seed()
+ randSleep = 1 + float(random.randint(1, 10) / 100)
+ return float(2 + float(randSleep * (1 + (try_count ** try_count))))
+
+
+async def aiohttp_response_with_errors(request_coroutine):
+ http_status, parsed_response, request_errors = None, None, False
+ try:
+ async with request_coroutine as response:
+ http_status = response.status
+ try:
+ parsed_response = await response.json()
+ except Exception:
+ if response.status not in [204]:
+ request_errors = True
+ try:
+ parsed_response = str(await response.read())
+ if len(parsed_response) > 100:
+ parsed_response = f"{parsed_response[:100]} ... (truncated)"
+ except Exception:
+ pass
+ TempFailure = (parsed_response is None or
+ (response.status not in [200, 201, 204] and "error" not in parsed_response))
+ if TempFailure:
+ parsed_response = response.reason if parsed_response is None else parsed_response
+ request_errors = True
+ except Exception:
+ request_errors = True
+ return http_status, parsed_response, request_errors
+
+
+async def api_call_with_retries(method,
+ endpoint,
+ params: Optional[Dict[str, Any]] = None,
+ shared_client=None,
+ try_count: int = 0) -> Dict[str, Any]:
+ url = f"{Constants.REST_URL}/{endpoint}"
+ headers = {"Content-Type": "application/json", "User-Agent": "hummingbot"}
+ http_client = shared_client if shared_client is not None else aiohttp.ClientSession()
+ # Build request coro
+ response_coro = http_client.request(method=method.upper(), url=url, headers=headers,
+ params=params, timeout=Constants.API_CALL_TIMEOUT)
+ http_status, parsed_response, request_errors = await aiohttp_response_with_errors(response_coro)
+ if shared_client is None:
+ await http_client.close()
+ if request_errors or parsed_response is None:
+ if try_count < Constants.API_MAX_RETRIES:
+ try_count += 1
+ time_sleep = retry_sleep_time(try_count)
+ print(f"Error fetching data from {url}. HTTP status is {http_status}. "
+ f"Retrying in {time_sleep:.0f}s.")
+ await asyncio.sleep(time_sleep)
+ return await api_call_with_retries(method=method, endpoint=endpoint, params=params,
+ shared_client=shared_client, try_count=try_count)
+ else:
+ raise CoinzoomAPIError({"error": parsed_response, "status": http_status})
+ return parsed_response
+
+
+KEYS = {
+ "coinzoom_api_key":
+ ConfigVar(key="coinzoom_api_key",
+ prompt=f"Enter your {Constants.EXCHANGE_NAME} API key >>> ",
+ required_if=using_exchange("coinzoom"),
+ is_secure=True,
+ is_connect_key=True),
+ "coinzoom_secret_key":
+ ConfigVar(key="coinzoom_secret_key",
+ prompt=f"Enter your {Constants.EXCHANGE_NAME} secret key >>> ",
+ required_if=using_exchange("coinzoom"),
+ is_secure=True,
+ is_connect_key=True),
+ "coinzoom_username":
+ ConfigVar(key="coinzoom_username",
+ prompt=f"Enter your {Constants.EXCHANGE_NAME} ZoomMe username >>> ",
+ required_if=using_exchange("coinzoom"),
+ is_secure=True,
+ is_connect_key=True),
+}
diff --git a/hummingbot/connector/exchange/coinzoom/coinzoom_websocket.py b/hummingbot/connector/exchange/coinzoom/coinzoom_websocket.py
new file mode 100644
index 0000000000..1e233e2054
--- /dev/null
+++ b/hummingbot/connector/exchange/coinzoom/coinzoom_websocket.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+import asyncio
+import logging
+import websockets
+import json
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+
+
+from typing import (
+ Any,
+ AsyncIterable,
+ Dict,
+ List,
+ Optional,
+)
+from websockets.exceptions import ConnectionClosed
+from hummingbot.logger import HummingbotLogger
+from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth
+
+# reusable websocket class
+# ToDo: We should eventually remove this class, and instantiate web socket connection normally (see Binance for example)
+
+
+class CoinzoomWebsocket():
+ _logger: Optional[HummingbotLogger] = None
+
+ @classmethod
+ def logger(cls) -> HummingbotLogger:
+ if cls._logger is None:
+ cls._logger = logging.getLogger(__name__)
+ return cls._logger
+
+ def __init__(self,
+ auth: Optional[CoinzoomAuth] = None):
+ self._auth: Optional[CoinzoomAuth] = auth
+ self._isPrivate = True if self._auth is not None else False
+ self._WS_URL = Constants.WS_PRIVATE_URL if self._isPrivate else Constants.WS_PUBLIC_URL
+ self._client: Optional[websockets.WebSocketClientProtocol] = None
+ self._is_subscribed = False
+
+ @property
+ def is_subscribed(self):
+ return self._is_subscribed
+
+ # connect to exchange
+ async def connect(self):
+ # if auth class was passed into websocket class
+ # we need to emit authenticated requests
+ extra_headers = self._auth.get_headers() if self._isPrivate else None
+ self._client = await websockets.connect(self._WS_URL, extra_headers=extra_headers)
+
+ return self._client
+
+ # disconnect from exchange
+ async def disconnect(self):
+ if self._client is None:
+ return
+
+ await self._client.close()
+
+ # receive & parse messages
+ async def _messages(self) -> AsyncIterable[Any]:
+ try:
+ while True:
+ try:
+ raw_msg_str: str = await asyncio.wait_for(self._client.recv(), timeout=Constants.MESSAGE_TIMEOUT)
+ try:
+ msg = json.loads(raw_msg_str)
+
+ # CoinZoom doesn't support ping or heartbeat messages.
+ # Can handle them here if that changes - use `safe_ensure_future`.
+
+ # Check response for a subscribed/unsubscribed message;
+ result: List[str] = list([d['result']
+ for k, d in msg.items()
+ if (isinstance(d, dict) and d.get('result') is not None)])
+
+ if len(result):
+ if result[0] == 'subscribed':
+ self._is_subscribed = True
+ elif result[0] == 'unsubscribed':
+ self._is_subscribed = False
+ yield None
+ else:
+ yield msg
+ except ValueError:
+ continue
+ except asyncio.TimeoutError:
+ await asyncio.wait_for(self._client.ping(), timeout=Constants.PING_TIMEOUT)
+ except asyncio.TimeoutError:
+ self.logger().warning("WebSocket ping timed out. Going to reconnect...")
+ return
+ except ConnectionClosed:
+ return
+ finally:
+ await self.disconnect()
+
+ # emit messages
+ async def _emit(self, method: str, action: str, data: Optional[Dict[str, Any]] = {}) -> int:
+ payload = {
+ method: {
+ "action": action,
+ **data
+ }
+ }
+ return await self._client.send(json.dumps(payload))
+
+ # request via websocket
+ async def request(self, method: str, action: str, data: Optional[Dict[str, Any]] = {}) -> int:
+ return await self._emit(method, action, data)
+
+ # subscribe to a method
+ async def subscribe(self,
+ streams: Optional[Dict[str, Any]] = {}) -> int:
+ for stream, stream_dict in streams.items():
+ if self._isPrivate:
+ stream_dict = {**stream_dict, **self._auth.get_ws_params()}
+ await self.request(stream, "subscribe", stream_dict)
+ return True
+
+ # unsubscribe to a method
+ async def unsubscribe(self,
+ streams: Optional[Dict[str, Any]] = {}) -> int:
+ for stream, stream_dict in streams.items():
+ if self._isPrivate:
+ stream_dict = {**stream_dict, **self._auth.get_ws_params()}
+ await self.request(stream, "unsubscribe", stream_dict)
+ return True
+
+ # listen to messages by method
+ async def on_message(self) -> AsyncIterable[Any]:
+ async for msg in self._messages():
+ yield msg
diff --git a/hummingbot/connector/exchange/digifinex/digifinex_rest_api.py b/hummingbot/connector/exchange/digifinex/digifinex_rest_api.py
index 1a7f89469a..34ca77afb5 100644
--- a/hummingbot/connector/exchange/digifinex/digifinex_rest_api.py
+++ b/hummingbot/connector/exchange/digifinex/digifinex_rest_api.py
@@ -33,6 +33,7 @@ async def request(self,
headers = self._auth.get_private_headers(path_url, request_id, params)
else:
headers = {}
+ headers['User-Agent'] = 'hummingbot'
if method == "get":
url = f'{url}?{urllib.parse.urlencode(params)}'
diff --git a/hummingbot/strategy/perpetual_market_making/perpetual_market_making.pyx b/hummingbot/strategy/perpetual_market_making/perpetual_market_making.pyx
index 7a842f6d2e..b5a635b37c 100644
--- a/hummingbot/strategy/perpetual_market_making/perpetual_market_making.pyx
+++ b/hummingbot/strategy/perpetual_market_making/perpetual_market_making.pyx
@@ -661,7 +661,7 @@ cdef class PerpetualMarketMakingStrategy(StrategyBase):
for position in active_positions:
if (ask_price > position.entry_price and position.amount > 0) or (bid_price < position.entry_price and position.amount < 0):
# check if there is an active order to take profit, and create if none exists
- profit_spread = self._long_profit_taking_spread if position.amount < 0 else self._short_profit_taking_spread
+ profit_spread = self._long_profit_taking_spread if position.amount > 0 else self._short_profit_taking_spread
take_profit_price = position.entry_price * (Decimal("1") + profit_spread) if position.amount > 0 \
else position.entry_price * (Decimal("1") - profit_spread)
price = market.c_quantize_order_price(self.trading_pair, take_profit_price)
@@ -675,10 +675,10 @@ cdef class PerpetualMarketMakingStrategy(StrategyBase):
size = market.c_quantize_order_amount(self.trading_pair, abs(position.amount))
if size > 0 and price > 0:
if position.amount < 0:
- self.logger().info(f"Creating profit taking buy order to lock profit on long position.")
+ self.logger().info(f"Creating profit taking buy order to lock profit on short position.")
buys.append(PriceSize(price, size))
else:
- self.logger().info(f"Creating profit taking sell order to lock profit on short position.")
+ self.logger().info(f"Creating profit taking sell order to lock profit on long position.")
sells.append(PriceSize(price, size))
return Proposal(buys, sells)
diff --git a/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml b/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml
index fa3324522f..b8e79b7993 100644
--- a/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml
+++ b/hummingbot/templates/conf_fee_overrides_TEMPLATE.yml
@@ -17,6 +17,9 @@ beaxy_taker_fee:
coinbase_pro_maker_fee:
coinbase_pro_taker_fee:
+coinzoom_maker_fee:
+coinzoom_taker_fee:
+
dydx_maker_fee:
dydx_taker_fee:
diff --git a/hummingbot/templates/conf_global_TEMPLATE.yml b/hummingbot/templates/conf_global_TEMPLATE.yml
index 84946fb27f..ad833aa312 100644
--- a/hummingbot/templates/conf_global_TEMPLATE.yml
+++ b/hummingbot/templates/conf_global_TEMPLATE.yml
@@ -34,6 +34,10 @@ coinbase_pro_api_key: null
coinbase_pro_secret_key: null
coinbase_pro_passphrase: null
+coinzoom_api_key: null
+coinzoom_secret_key: null
+coinzoom_username: null
+
dydx_eth_private_key: null
dydx_node_address: null
diff --git a/setup.py b/setup.py
index fc70445f2d..08ece91061 100755
--- a/setup.py
+++ b/setup.py
@@ -56,6 +56,7 @@ def main():
"hummingbot.connector.exchange.bittrex",
"hummingbot.connector.exchange.bamboo_relay",
"hummingbot.connector.exchange.coinbase_pro",
+ "hummingbot.connector.exchange.coinzoom",
"hummingbot.connector.exchange.dydx",
"hummingbot.connector.exchange.huobi",
"hummingbot.connector.exchange.radar_relay",
diff --git a/test/connector/exchange/coinzoom/.gitignore b/test/connector/exchange/coinzoom/.gitignore
new file mode 100644
index 0000000000..23d9952b8c
--- /dev/null
+++ b/test/connector/exchange/coinzoom/.gitignore
@@ -0,0 +1 @@
+backups
\ No newline at end of file
diff --git a/test/connector/exchange/coinzoom/__init__.py b/test/connector/exchange/coinzoom/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/test/connector/exchange/coinzoom/test_coinzoom_auth.py b/test/connector/exchange/coinzoom/test_coinzoom_auth.py
new file mode 100644
index 0000000000..f2573e4460
--- /dev/null
+++ b/test/connector/exchange/coinzoom/test_coinzoom_auth.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+import sys
+import asyncio
+import unittest
+import aiohttp
+import conf
+import logging
+from async_timeout import timeout
+from os.path import join, realpath
+from typing import Dict, Any
+from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth
+from hummingbot.connector.exchange.coinzoom.coinzoom_websocket import CoinzoomWebsocket
+from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL
+from hummingbot.connector.exchange.coinzoom.coinzoom_constants import Constants
+
+sys.path.insert(0, realpath(join(__file__, "../../../../../")))
+logging.basicConfig(level=METRICS_LOG_LEVEL)
+
+
+class TestAuth(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
+ api_key = conf.coinzoom_api_key
+ secret_key = conf.coinzoom_secret_key
+ api_username = conf.coinzoom_username
+ cls.auth = CoinzoomAuth(api_key, secret_key, api_username)
+
+ async def rest_auth(self) -> Dict[Any, Any]:
+ endpoint = Constants.ENDPOINT['USER_BALANCES']
+ headers = self.auth.get_headers()
+ response = await aiohttp.ClientSession().get(f"{Constants.REST_URL}/{endpoint}", headers=headers)
+ return await response.json()
+
+ async def ws_auth(self) -> Dict[Any, Any]:
+ ws = CoinzoomWebsocket(self.auth)
+ await ws.connect()
+ user_ws_streams = {Constants.WS_SUB["USER_ORDERS_TRADES"]: {}}
+ async with timeout(30):
+ await ws.subscribe(user_ws_streams)
+ async for response in ws.on_message():
+ if ws.is_subscribed:
+ return True
+ return False
+
+ def test_rest_auth(self):
+ result = self.ev_loop.run_until_complete(self.rest_auth())
+ if len(result) == 0 or "currency" not in result[0].keys():
+ print(f"Unexpected response for API call: {result}")
+ assert "currency" in result[0].keys()
+
+ def test_ws_auth(self):
+ subscribed = self.ev_loop.run_until_complete(self.ws_auth())
+ assert subscribed is True
diff --git a/test/connector/exchange/coinzoom/test_coinzoom_exchange.py b/test/connector/exchange/coinzoom/test_coinzoom_exchange.py
new file mode 100644
index 0000000000..979b4651b6
--- /dev/null
+++ b/test/connector/exchange/coinzoom/test_coinzoom_exchange.py
@@ -0,0 +1,442 @@
+from os.path import join, realpath
+import sys; sys.path.insert(0, realpath(join(__file__, "../../../../../")))
+import asyncio
+import logging
+from decimal import Decimal
+import unittest
+import contextlib
+import time
+import os
+from typing import List
+import conf
+import math
+from async_timeout import timeout
+
+from hummingbot.core.clock import Clock, ClockMode
+from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL
+from hummingbot.core.utils.async_utils import safe_gather, safe_ensure_future
+from hummingbot.core.event.event_logger import EventLogger
+from hummingbot.core.event.events import (
+ BuyOrderCompletedEvent,
+ BuyOrderCreatedEvent,
+ MarketEvent,
+ OrderFilledEvent,
+ OrderType,
+ SellOrderCompletedEvent,
+ SellOrderCreatedEvent,
+ OrderCancelledEvent
+)
+from hummingbot.model.sql_connection_manager import (
+ SQLConnectionManager,
+ SQLConnectionType
+)
+from hummingbot.model.market_state import MarketState
+from hummingbot.model.order import Order
+from hummingbot.model.trade_fill import TradeFill
+from hummingbot.connector.markets_recorder import MarketsRecorder
+from hummingbot.connector.exchange.coinzoom.coinzoom_exchange import CoinzoomExchange
+
+logging.basicConfig(level=METRICS_LOG_LEVEL)
+
+API_KEY = conf.coinzoom_api_key
+API_SECRET = conf.coinzoom_secret_key
+API_USERNAME = conf.coinzoom_username
+
+
+class CoinzoomExchangeUnitTest(unittest.TestCase):
+ events: List[MarketEvent] = [
+ MarketEvent.BuyOrderCompleted,
+ MarketEvent.SellOrderCompleted,
+ MarketEvent.OrderFilled,
+ MarketEvent.TransactionFailure,
+ MarketEvent.BuyOrderCreated,
+ MarketEvent.SellOrderCreated,
+ MarketEvent.OrderCancelled,
+ MarketEvent.OrderFailure
+ ]
+ connector: CoinzoomExchange
+ event_logger: EventLogger
+ trading_pair = "BTC-USD"
+ base_token, quote_token = trading_pair.split("-")
+ stack: contextlib.ExitStack
+
+ @classmethod
+ def setUpClass(cls):
+ global MAINNET_RPC_URL
+
+ cls.ev_loop = asyncio.get_event_loop()
+
+ cls.clock: Clock = Clock(ClockMode.REALTIME)
+ cls.connector: CoinzoomExchange = CoinzoomExchange(
+ coinzoom_api_key=API_KEY,
+ coinzoom_secret_key=API_SECRET,
+ coinzoom_username=API_USERNAME,
+ trading_pairs=[cls.trading_pair],
+ trading_required=True
+ )
+ print("Initializing Coinzoom market... this will take about a minute.")
+ cls.clock.add_iterator(cls.connector)
+ cls.stack: contextlib.ExitStack = contextlib.ExitStack()
+ cls._clock = cls.stack.enter_context(cls.clock)
+ cls.ev_loop.run_until_complete(cls.wait_til_ready())
+ print("Ready.")
+
+ @classmethod
+ def tearDownClass(cls) -> None:
+ cls.stack.close()
+
+ @classmethod
+ async def wait_til_ready(cls, connector = None):
+ if connector is None:
+ connector = cls.connector
+ async with timeout(90):
+ while True:
+ now = time.time()
+ next_iteration = now // 1.0 + 1
+ if connector.ready:
+ break
+ else:
+ await cls._clock.run_til(next_iteration)
+ await asyncio.sleep(1.0)
+
+ def setUp(self):
+ self.db_path: str = realpath(join(__file__, "../connector_test.sqlite"))
+ try:
+ os.unlink(self.db_path)
+ except FileNotFoundError:
+ pass
+
+ self.event_logger = EventLogger()
+ for event_tag in self.events:
+ self.connector.add_listener(event_tag, self.event_logger)
+
+ def tearDown(self):
+ for event_tag in self.events:
+ self.connector.remove_listener(event_tag, self.event_logger)
+ self.event_logger = None
+
+ async def run_parallel_async(self, *tasks):
+ future: asyncio.Future = safe_ensure_future(safe_gather(*tasks))
+ while not future.done():
+ now = time.time()
+ next_iteration = now // 1.0 + 1
+ await self._clock.run_til(next_iteration)
+ await asyncio.sleep(1.0)
+ return future.result()
+
+ def run_parallel(self, *tasks):
+ return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks))
+
+ def _place_order(self, is_buy, amount, order_type, price, ex_order_id) -> str:
+ if is_buy:
+ cl_order_id = self.connector.buy(self.trading_pair, amount, order_type, price)
+ else:
+ cl_order_id = self.connector.sell(self.trading_pair, amount, order_type, price)
+ return cl_order_id
+
+ def _cancel_order(self, cl_order_id, connector=None):
+ if connector is None:
+ connector = self.connector
+ return connector.cancel(self.trading_pair, cl_order_id)
+
+ def test_estimate_fee(self):
+ maker_fee = self.connector.estimate_fee_pct(True)
+ self.assertAlmostEqual(maker_fee, Decimal("0.002"))
+ taker_fee = self.connector.estimate_fee_pct(False)
+ self.assertAlmostEqual(taker_fee, Decimal("0.0026"))
+
+ def test_buy_and_sell(self):
+ price = self.connector.get_price(self.trading_pair, True) * Decimal("1.02")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+ quote_bal = self.connector.get_available_balance(self.quote_token)
+ base_bal = self.connector.get_available_balance(self.base_token)
+
+ order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1)
+ order_completed_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCompletedEvent))
+ self.ev_loop.run_until_complete(asyncio.sleep(5))
+ trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)]
+ base_amount_traded = sum(t.amount for t in trade_events)
+ quote_amount_traded = sum(t.amount * t.price for t in trade_events)
+
+ self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events])
+ self.assertEqual(order_id, order_completed_event.order_id)
+ self.assertEqual(amount, order_completed_event.base_asset_amount)
+ self.assertEqual("BTC", order_completed_event.base_asset)
+ self.assertEqual("USD", order_completed_event.quote_asset)
+ self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount)
+ self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount)
+ self.assertGreater(order_completed_event.fee_amount, Decimal(0))
+ self.assertTrue(any([isinstance(event, BuyOrderCreatedEvent) and str(event.order_id) == str(order_id)
+ for event in self.event_logger.event_log]))
+
+ # check available quote balance gets updated, we need to wait a bit for the balance message to arrive
+ expected_quote_bal = quote_bal - quote_amount_traded
+ # self.ev_loop.run_until_complete(asyncio.sleep(1))
+ self.ev_loop.run_until_complete(self.connector._update_balances())
+ self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 1)
+
+ # Reset the logs
+ self.event_logger.clear()
+
+ # Try to sell back the same amount to the exchange, and watch for completion event.
+ price = self.connector.get_price(self.trading_pair, True) * Decimal("0.98")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+ order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2)
+ order_completed_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCompletedEvent))
+ trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)]
+ base_amount_traded = sum(t.amount for t in trade_events)
+ quote_amount_traded = sum(t.amount * t.price for t in trade_events)
+
+ self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events])
+ self.assertEqual(order_id, order_completed_event.order_id)
+ self.assertEqual(amount, order_completed_event.base_asset_amount)
+ self.assertEqual("BTC", order_completed_event.base_asset)
+ self.assertEqual("USD", order_completed_event.quote_asset)
+ self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount)
+ self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount)
+ self.assertGreater(order_completed_event.fee_amount, Decimal(0))
+ self.assertTrue(any([isinstance(event, SellOrderCreatedEvent) and event.order_id == order_id
+ for event in self.event_logger.event_log]))
+
+ # check available base balance gets updated, we need to wait a bit for the balance message to arrive
+ expected_base_bal = base_bal
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+ self.ev_loop.run_until_complete(self.connector._update_balances())
+ self.ev_loop.run_until_complete(asyncio.sleep(5))
+ self.assertAlmostEqual(expected_base_bal, self.connector.get_available_balance(self.base_token), 5)
+
+ def test_limit_makers_unfilled(self):
+ price = self.connector.get_price(self.trading_pair, True) * Decimal("0.8")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+ price_quantum = self.connector.get_order_price_quantum(self.trading_pair, price)
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+ self.ev_loop.run_until_complete(self.connector._update_balances())
+ self.ev_loop.run_until_complete(asyncio.sleep(2))
+ quote_bal = self.connector.get_available_balance(self.quote_token)
+
+ cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1)
+ order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent))
+ self.assertEqual(cl_order_id, order_created_event.order_id)
+ # check available quote balance gets updated, we need to wait a bit for the balance message to arrive
+ taker_fee = self.connector.estimate_fee_pct(False)
+ quote_amount = (math.ceil(((price * amount) * (Decimal("1") + taker_fee)) / price_quantum) * price_quantum)
+ expected_quote_bal = quote_bal - quote_amount
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+ self.ev_loop.run_until_complete(self.connector._update_balances())
+ self.ev_loop.run_until_complete(asyncio.sleep(2))
+
+ self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 5)
+ self._cancel_order(cl_order_id)
+ event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ self.assertEqual(cl_order_id, event.order_id)
+
+ price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+
+ cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2)
+ order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCreatedEvent))
+ self.assertEqual(cl_order_id, order_created_event.order_id)
+ self._cancel_order(cl_order_id)
+ event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ self.assertEqual(cl_order_id, event.order_id)
+
+ # # @TODO: find a way to create "rejected"
+ # def test_limit_maker_rejections(self):
+ # price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2")
+ # price = self.connector.quantize_order_price(self.trading_pair, price)
+ # amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000001"))
+ # cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1)
+ # event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ # self.assertEqual(cl_order_id, event.order_id)
+
+ # price = self.connector.get_price(self.trading_pair, False) * Decimal("0.8")
+ # price = self.connector.quantize_order_price(self.trading_pair, price)
+ # amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000001"))
+ # cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2)
+ # event = self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ # self.assertEqual(cl_order_id, event.order_id)
+
+ def test_cancel_all(self):
+ bid_price = self.connector.get_price(self.trading_pair, True)
+ ask_price = self.connector.get_price(self.trading_pair, False)
+ bid_price = self.connector.quantize_order_price(self.trading_pair, bid_price * Decimal("0.9"))
+ ask_price = self.connector.quantize_order_price(self.trading_pair, ask_price * Decimal("1.1"))
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+
+ buy_id = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1)
+ sell_id = self._place_order(False, amount, OrderType.LIMIT, ask_price, 2)
+
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+ asyncio.ensure_future(self.connector.cancel_all(15))
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+ cancel_events = [t for t in self.event_logger.event_log if isinstance(t, OrderCancelledEvent)]
+ self.assertEqual({buy_id, sell_id}, {o.order_id for o in cancel_events})
+
+ def test_order_quantized_values(self):
+ bid_price: Decimal = self.connector.get_price(self.trading_pair, True)
+ ask_price: Decimal = self.connector.get_price(self.trading_pair, False)
+ mid_price: Decimal = (bid_price + ask_price) / 2
+
+ # Make sure there's enough balance to make the limit orders.
+ self.assertGreater(self.connector.get_balance("BTC"), Decimal("0.0005"))
+ self.assertGreater(self.connector.get_balance("USD"), Decimal("10"))
+
+ # Intentionally set some prices with too many decimal places s.t. they
+ # need to be quantized. Also, place them far away from the mid-price s.t. they won't
+ # get filled during the test.
+ bid_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("0.9333192292111341"))
+ ask_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("1.1492431474884933"))
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.000123456"))
+
+ # Test bid order
+ cl_order_id_1 = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1)
+ # Wait for the order created event and examine the order made
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent))
+
+ # Test ask order
+ cl_order_id_2 = self._place_order(False, amount, OrderType.LIMIT, ask_price, 1)
+ # Wait for the order created event and examine and order made
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCreatedEvent))
+
+ self._cancel_order(cl_order_id_1)
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ self._cancel_order(cl_order_id_2)
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+
+ def test_orders_saving_and_restoration(self):
+ config_path = "test_config"
+ strategy_name = "test_strategy"
+ sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path)
+ order_id = None
+ recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name)
+ recorder.start()
+
+ try:
+ self.connector._in_flight_orders.clear()
+ self.assertEqual(0, len(self.connector.tracking_states))
+
+ # Try to put limit buy order for 0.02 ETH worth of ZRX, and watch for order creation event.
+ current_bid_price: Decimal = self.connector.get_price(self.trading_pair, True)
+ price: Decimal = current_bid_price * Decimal("0.8")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+
+ amount: Decimal = Decimal("0.0001")
+ amount = self.connector.quantize_order_amount(self.trading_pair, amount)
+
+ cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1)
+ order_created_event = self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCreatedEvent))
+ self.assertEqual(cl_order_id, order_created_event.order_id)
+
+ # Verify tracking states
+ self.assertEqual(1, len(self.connector.tracking_states))
+ self.assertEqual(cl_order_id, list(self.connector.tracking_states.keys())[0])
+
+ # Verify orders from recorder
+ recorded_orders: List[Order] = recorder.get_orders_for_config_and_market(config_path, self.connector)
+ self.assertEqual(1, len(recorded_orders))
+ self.assertEqual(cl_order_id, recorded_orders[0].id)
+
+ # Verify saved market states
+ saved_market_states: MarketState = recorder.get_market_states(config_path, self.connector)
+ self.assertIsNotNone(saved_market_states)
+ self.assertIsInstance(saved_market_states.saved_state, dict)
+ self.assertGreater(len(saved_market_states.saved_state), 0)
+
+ # Close out the current market and start another market.
+ self.connector.stop(self._clock)
+ self.ev_loop.run_until_complete(asyncio.sleep(5))
+ self.clock.remove_iterator(self.connector)
+ for event_tag in self.events:
+ self.connector.remove_listener(event_tag, self.event_logger)
+ # Clear the event loop
+ self.event_logger.clear()
+ new_connector = CoinzoomExchange(API_KEY, API_SECRET, API_USERNAME, [self.trading_pair], True)
+ for event_tag in self.events:
+ new_connector.add_listener(event_tag, self.event_logger)
+ recorder.stop()
+ recorder = MarketsRecorder(sql, [new_connector], config_path, strategy_name)
+ recorder.start()
+ saved_market_states = recorder.get_market_states(config_path, new_connector)
+ self.clock.add_iterator(new_connector)
+ self.ev_loop.run_until_complete(self.wait_til_ready(new_connector))
+ self.assertEqual(0, len(new_connector.limit_orders))
+ self.assertEqual(0, len(new_connector.tracking_states))
+ new_connector.restore_tracking_states(saved_market_states.saved_state)
+ self.assertEqual(1, len(new_connector.limit_orders))
+ self.assertEqual(1, len(new_connector.tracking_states))
+
+ # Cancel the order and verify that the change is saved.
+ self._cancel_order(cl_order_id, new_connector)
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(OrderCancelledEvent))
+ recorder.save_market_states(config_path, new_connector)
+ order_id = None
+ self.assertEqual(0, len(new_connector.limit_orders))
+ self.assertEqual(0, len(new_connector.tracking_states))
+ saved_market_states = recorder.get_market_states(config_path, new_connector)
+ self.assertEqual(0, len(saved_market_states.saved_state))
+ finally:
+ if order_id is not None:
+ self.connector.cancel(self.trading_pair, cl_order_id)
+ self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent))
+
+ recorder.stop()
+ os.unlink(self.db_path)
+
+ def test_update_last_prices(self):
+ # This is basic test to see if order_book last_trade_price is initiated and updated.
+ for order_book in self.connector.order_books.values():
+ for _ in range(5):
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+ self.assertFalse(math.isnan(order_book.last_trade_price))
+
+ def test_filled_orders_recorded(self):
+ config_path: str = "test_config"
+ strategy_name: str = "test_strategy"
+ sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path)
+ order_id = None
+ recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name)
+ recorder.start()
+
+ try:
+ # Try to buy some token from the exchange, and watch for completion event.
+ price = self.connector.get_price(self.trading_pair, True) * Decimal("1.05")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+
+ order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1)
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(BuyOrderCompletedEvent))
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+
+ # Reset the logs
+ self.event_logger.clear()
+
+ # Try to sell back the same amount to the exchange, and watch for completion event.
+ price = self.connector.get_price(self.trading_pair, True) * Decimal("0.95")
+ price = self.connector.quantize_order_price(self.trading_pair, price)
+ amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.0001"))
+ order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2)
+ self.ev_loop.run_until_complete(self.event_logger.wait_for(SellOrderCompletedEvent))
+ self.ev_loop.run_until_complete(asyncio.sleep(1))
+
+ # Query the persisted trade logs
+ trade_fills: List[TradeFill] = recorder.get_trades_for_config(config_path)
+ self.assertGreaterEqual(len(trade_fills), 2)
+ buy_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "BUY"]
+ sell_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "SELL"]
+ self.assertGreaterEqual(len(buy_fills), 1)
+ self.assertGreaterEqual(len(sell_fills), 1)
+
+ order_id = None
+
+ finally:
+ if order_id is not None:
+ self.connector.cancel(self.trading_pair, order_id)
+ self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent))
+
+ recorder.stop()
+ os.unlink(self.db_path)
diff --git a/test/connector/exchange/coinzoom/test_coinzoom_order_book_tracker.py b/test/connector/exchange/coinzoom/test_coinzoom_order_book_tracker.py
new file mode 100755
index 0000000000..62a1a1b6d9
--- /dev/null
+++ b/test/connector/exchange/coinzoom/test_coinzoom_order_book_tracker.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+import sys
+import math
+import time
+import asyncio
+import logging
+import unittest
+from async_timeout import timeout
+from os.path import join, realpath
+from typing import Dict, Optional, List
+from hummingbot.core.event.event_logger import EventLogger
+from hummingbot.core.event.events import OrderBookEvent, OrderBookTradeEvent, TradeType
+from hummingbot.connector.exchange.coinzoom.coinzoom_order_book_tracker import CoinzoomOrderBookTracker
+from hummingbot.connector.exchange.coinzoom.coinzoom_api_order_book_data_source import CoinzoomAPIOrderBookDataSource
+from hummingbot.core.data_type.order_book import OrderBook
+from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL
+
+
+sys.path.insert(0, realpath(join(__file__, "../../../../../")))
+logging.basicConfig(level=METRICS_LOG_LEVEL)
+
+
+class CoinzoomOrderBookTrackerUnitTest(unittest.TestCase):
+ order_book_tracker: Optional[CoinzoomOrderBookTracker] = None
+ events: List[OrderBookEvent] = [
+ OrderBookEvent.TradeEvent
+ ]
+ trading_pairs: List[str] = [
+ "BTC-USD",
+ "ETH-USD",
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
+ cls.order_book_tracker: CoinzoomOrderBookTracker = CoinzoomOrderBookTracker(cls.trading_pairs)
+ cls.order_book_tracker.start()
+ cls.ev_loop.run_until_complete(cls.wait_til_tracker_ready())
+
+ @classmethod
+ async def wait_til_tracker_ready(cls):
+ async with timeout(20):
+ while True:
+ if len(cls.order_book_tracker.order_books) > 0:
+ print("Initialized real-time order books.")
+ return
+ await asyncio.sleep(1)
+
+ async def run_parallel_async(self, *tasks, timeout=None):
+ future: asyncio.Future = asyncio.ensure_future(asyncio.gather(*tasks))
+ timer = 0
+ while not future.done():
+ if timeout and timer > timeout:
+ raise Exception("Timeout running parallel async tasks in tests")
+ timer += 1
+ now = time.time()
+ _next_iteration = now // 1.0 + 1 # noqa: F841
+ await asyncio.sleep(1.0)
+ return future.result()
+
+ def run_parallel(self, *tasks):
+ return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks, timeout=60))
+
+ def setUp(self):
+ self.event_logger = EventLogger()
+ for event_tag in self.events:
+ for trading_pair, order_book in self.order_book_tracker.order_books.items():
+ order_book.add_listener(event_tag, self.event_logger)
+
+ def test_order_book_trade_event_emission(self):
+ """
+ Tests if the order book tracker is able to retrieve order book trade message from exchange and emit order book
+ trade events after correctly parsing the trade messages
+ """
+ self.run_parallel(self.event_logger.wait_for(OrderBookTradeEvent))
+ for ob_trade_event in self.event_logger.event_log:
+ self.assertTrue(type(ob_trade_event) == OrderBookTradeEvent)
+ self.assertTrue(ob_trade_event.trading_pair in self.trading_pairs)
+ self.assertTrue(type(ob_trade_event.timestamp) in [float, int])
+ self.assertTrue(type(ob_trade_event.amount) == float)
+ self.assertTrue(type(ob_trade_event.price) == float)
+ self.assertTrue(type(ob_trade_event.type) == TradeType)
+ # datetime is in milliseconds
+ self.assertTrue(math.ceil(math.log10(ob_trade_event.timestamp)) == 13)
+ self.assertTrue(ob_trade_event.amount > 0)
+ self.assertTrue(ob_trade_event.price > 0)
+
+ def test_tracker_integrity(self):
+ # Wait 5 seconds to process some diffs.
+ self.ev_loop.run_until_complete(asyncio.sleep(5.0))
+ order_books: Dict[str, OrderBook] = self.order_book_tracker.order_books
+ eth_usd: OrderBook = order_books["ETH-USD"]
+ self.assertIsNot(eth_usd.last_diff_uid, 0)
+ self.assertGreaterEqual(eth_usd.get_price_for_volume(True, 10).result_price,
+ eth_usd.get_price(True))
+ self.assertLessEqual(eth_usd.get_price_for_volume(False, 10).result_price,
+ eth_usd.get_price(False))
+
+ def test_api_get_last_traded_prices(self):
+ prices = self.ev_loop.run_until_complete(
+ CoinzoomAPIOrderBookDataSource.get_last_traded_prices(["BTC-USD", "LTC-BTC"]))
+ for key, value in prices.items():
+ print(f"{key} last_trade_price: {value}")
+ self.assertGreater(prices["BTC-USD"], 1000)
+ self.assertLess(prices["LTC-BTC"], 1)
diff --git a/test/connector/exchange/coinzoom/test_coinzoom_user_stream_tracker.py b/test/connector/exchange/coinzoom/test_coinzoom_user_stream_tracker.py
new file mode 100644
index 0000000000..f9f85f335d
--- /dev/null
+++ b/test/connector/exchange/coinzoom/test_coinzoom_user_stream_tracker.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+
+import sys
+import asyncio
+import logging
+import unittest
+import conf
+
+from os.path import join, realpath
+from hummingbot.connector.exchange.coinzoom.coinzoom_user_stream_tracker import CoinzoomUserStreamTracker
+from hummingbot.connector.exchange.coinzoom.coinzoom_auth import CoinzoomAuth
+from hummingbot.core.utils.async_utils import safe_ensure_future
+from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL
+
+
+sys.path.insert(0, realpath(join(__file__, "../../../../../")))
+logging.basicConfig(level=METRICS_LOG_LEVEL)
+
+
+class CoinzoomUserStreamTrackerUnitTest(unittest.TestCase):
+ api_key = conf.coinzoom_api_key
+ api_secret = conf.coinzoom_secret_key
+ api_username = conf.coinzoom_username
+
+ @classmethod
+ def setUpClass(cls):
+ cls.ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
+ cls.trading_pairs = ["BTC-USD"]
+ cls.user_stream_tracker: CoinzoomUserStreamTracker = CoinzoomUserStreamTracker(
+ coinzoom_auth=CoinzoomAuth(cls.api_key, cls.api_secret, cls.api_username),
+ trading_pairs=cls.trading_pairs)
+ cls.user_stream_tracker_task: asyncio.Task = safe_ensure_future(cls.user_stream_tracker.start())
+
+ def test_user_stream(self):
+ # Wait process some msgs.
+ print("Sleeping for 30s to gather some user stream messages.")
+ self.ev_loop.run_until_complete(asyncio.sleep(30.0))
+ print(self.user_stream_tracker.user_stream)