Skip to content

Commit

Permalink
Merge pull request hummingbot#6840 from yancong001/feat/kraken_connector
Browse files Browse the repository at this point in the history
Feat/kraken connector
  • Loading branch information
rapcmia authored Mar 12, 2024
2 parents c8159e0 + 518ca2d commit a24cc94
Show file tree
Hide file tree
Showing 25 changed files with 2,726 additions and 2,505 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ test:
--exclude-dir="test/mock" \
--exclude-dir="test/hummingbot/connector/gateway/amm" \
--exclude-dir="test/hummingbot/connector/exchange/coinbase_pro" \
--exclude-dir="test/hummingbot/connector/exchange/kraken" \
--exclude-dir="test/hummingbot/connector/exchange/hitbtc" \
--exclude-dir="test/hummingbot/connector/exchange/foxbit" \
--exclude-dir="test/hummingbot/connector/gateway/clob_spot/data_sources/dexalot" \
Expand Down
364 changes: 121 additions & 243 deletions hummingbot/connector/exchange/kraken/kraken_api_order_book_data_source.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,48 +1,33 @@
import asyncio
import logging
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional

from hummingbot.connector.exchange.kraken import kraken_constants as CONSTANTS
from hummingbot.connector.exchange.kraken.kraken_auth import KrakenAuth
from hummingbot.connector.exchange.kraken.kraken_order_book import KrakenOrderBook
from hummingbot.connector.exchange.kraken.kraken_utils import build_api_factory
from hummingbot.core.api_throttler.async_throttler import AsyncThrottler
from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource
from hummingbot.core.web_assistant.connections.data_types import RESTMethod, RESTRequest, WSJSONRequest
from hummingbot.core.web_assistant.rest_assistant import RESTAssistant
from hummingbot.core.web_assistant.connections.data_types import WSJSONRequest
from hummingbot.core.web_assistant.web_assistants_factory import WebAssistantsFactory
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.logger import HummingbotLogger

MESSAGE_TIMEOUT = 3.0
PING_TIMEOUT = 5.0
if TYPE_CHECKING:
from hummingbot.connector.exchange.kraken.kraken_exchange import KrakenExchange


class KrakenAPIUserStreamDataSource(UserStreamTrackerDataSource):

_krausds_logger: Optional[HummingbotLogger] = None

@classmethod
def logger(cls) -> HummingbotLogger:
if cls._krausds_logger is None:
cls._krausds_logger = logging.getLogger(__name__)
return cls._krausds_logger
_logger: Optional[HummingbotLogger] = None

def __init__(self,
throttler: AsyncThrottler,
kraken_auth: KrakenAuth,
connector: 'KrakenExchange',
api_factory: Optional[WebAssistantsFactory] = None):
self._throttler = throttler
self._api_factory = api_factory or build_api_factory(throttler=throttler)
self._rest_assistant = None
self._ws_assistant = None
self._kraken_auth: KrakenAuth = kraken_auth
self._current_auth_token: Optional[str] = None

super().__init__()
self._api_factory = api_factory
self._connector = connector
self._current_auth_token: Optional[str] = None

@property
def order_book_class(self):
return KrakenOrderBook
async def _connected_websocket_assistant(self) -> WSAssistant:
ws: WSAssistant = await self._api_factory.get_ws_assistant()
await ws.connect(ws_url=CONSTANTS.WS_AUTH_URL, ping_timeout=CONSTANTS.PING_TIMEOUT)
return ws

@property
def last_recv_time(self):
Expand All @@ -51,76 +36,63 @@ def last_recv_time(self):
else:
return self._ws_assistant.last_recv_time

async def _get_rest_assistant(self) -> RESTAssistant:
if self._rest_assistant is None:
self._rest_assistant = await self._api_factory.get_rest_assistant()
return self._rest_assistant

async def get_auth_token(self) -> str:
api_auth: Dict[str, Any] = self._kraken_auth.generate_auth_dict(uri=CONSTANTS.GET_TOKEN_PATH_URL)

url = f"{CONSTANTS.BASE_URL}{CONSTANTS.GET_TOKEN_PATH_URL}"

request = RESTRequest(
method=RESTMethod.POST,
url=url,
headers=api_auth["headers"],
data=api_auth["postDict"]
)
rest_assistant = await self._get_rest_assistant()

async with self._throttler.execute_task(CONSTANTS.GET_TOKEN_PATH_URL):
response = await rest_assistant.call(request=request, timeout=100)
if response.status != 200:
raise IOError(f"Error fetching Kraken user stream listen key. HTTP status is {response.status}.")

try:
response_json: Dict[str, Any] = await response.json()
except Exception:
raise IOError(f"Error parsing data from {url}.")

err = response_json["error"]
if "EAPI:Invalid nonce" in err:
self.logger().error(f"Invalid nonce error from {url}. " +
"Please ensure your Kraken API key nonce window is at least 10, " +
"and if needed reset your API key.")
raise IOError({"error": response_json})

return response_json["result"]["token"]

async def listen_for_user_stream(self, output: asyncio.Queue):
ws = None
while True:
try:
async with self._throttler.execute_task(CONSTANTS.WS_CONNECTION_LIMIT_ID):
ws: WSAssistant = await self._api_factory.get_ws_assistant()
await ws.connect(ws_url=CONSTANTS.WS_AUTH_URL, ping_timeout=PING_TIMEOUT)

if self._current_auth_token is None:
self._current_auth_token = await self.get_auth_token()

for subscription_type in ["openOrders", "ownTrades"]:
subscribe_request: WSJSONRequest = WSJSONRequest({
"event": "subscribe",
"subscription": {
"name": subscription_type,
"token": self._current_auth_token
}
})
await ws.send(subscribe_request)

async for ws_response in ws.iter_messages():
msg = ws_response.data
if not (type(msg) is dict and "event" in msg.keys() and
msg["event"] in ["heartbeat", "systemStatus", "subscriptionStatus"]):
output.put_nowait(msg)
except asyncio.CancelledError:
raise
except Exception:
self.logger().error("Unexpected error with Kraken WebSocket connection. "
"Retrying after 30 seconds...", exc_info=True)
self._current_auth_token = None
await asyncio.sleep(30.0)
finally:
if ws is not None:
await ws.disconnect()
try:
response_json = await self._connector._api_post(path_url=CONSTANTS.GET_TOKEN_PATH_URL, params={},
is_auth_required=True)
except Exception:
raise
return response_json["token"]

async def _subscribe_channels(self, websocket_assistant: WSAssistant):
"""
Subscribes to order events and balance events.
:param websocket_assistant: the websocket assistant used to connect to the exchange
"""
try:

if self._current_auth_token is None:
self._current_auth_token = await self.get_auth_token()

orders_change_payload = {
"event": "subscribe",
"subscription": {
"name": "openOrders",
"token": self._current_auth_token
}
}
subscribe_order_change_request: WSJSONRequest = WSJSONRequest(payload=orders_change_payload)

trades_payload = {
"event": "subscribe",
"subscription": {
"name": "ownTrades",
"token": self._current_auth_token
}
}
subscribe_trades_request: WSJSONRequest = WSJSONRequest(payload=trades_payload)

await websocket_assistant.send(subscribe_order_change_request)
await websocket_assistant.send(subscribe_trades_request)

self.logger().info("Subscribed to private order changes and trades updates channels...")
except asyncio.CancelledError:
raise
except Exception:
self.logger().exception("Unexpected error occurred subscribing to user streams...")
raise

async def _process_event_message(self, event_message: Dict[str, Any], queue: asyncio.Queue):
if type(event_message) is list and event_message[-2] in [
CONSTANTS.USER_TRADES_ENDPOINT_NAME,
CONSTANTS.USER_ORDERS_ENDPOINT_NAME,
]:
queue.put_nowait(event_message)
else:
if event_message.get("errorMessage") is not None:
err_msg = event_message.get("errorMessage")
raise IOError({
"label": "WSS_ERROR",
"message": f"Error received via websocket - {err_msg}."
})
48 changes: 38 additions & 10 deletions hummingbot/connector/exchange/kraken/kraken_auth.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,48 @@
from typing import (
Optional,
Dict,
Any
)
import base64
import hashlib
import hmac
from hummingbot.connector.exchange.kraken.kraken_tracking_nonce import get_tracking_nonce
import json
import time
from typing import Any, Dict, Optional
from urllib.parse import urlparse

from hummingbot.connector.time_synchronizer import TimeSynchronizer
from hummingbot.core.web_assistant.auth import AuthBase
from hummingbot.core.web_assistant.connections.data_types import RESTRequest, WSRequest

class KrakenAuth:
def __init__(self, api_key: str, secret_key: str):

class KrakenAuth(AuthBase):
_last_tracking_nonce: int = 0

def __init__(self, api_key: str, secret_key: str, time_provider: TimeSynchronizer):
self.api_key = api_key
self.secret_key = secret_key
self.time_provider = time_provider

@classmethod
def get_tracking_nonce(self) -> str:
nonce = int(time.time())
self._last_tracking_nonce = nonce if nonce > self._last_tracking_nonce else self._last_tracking_nonce + 1
return str(self._last_tracking_nonce)

async def rest_authenticate(self, request: RESTRequest) -> RESTRequest:

data = json.loads(request.data) if request.data is not None else {}
_path = urlparse(request.url).path

auth_dict: Dict[str, Any] = self._generate_auth_dict(_path, data)
request.headers = auth_dict["headers"]
request.data = auth_dict["postDict"]
return request

async def ws_authenticate(self, request: WSRequest) -> WSRequest:
"""
This method is intended to configure a websocket request to be authenticated. Mexc does not use this
functionality
"""
return request # pass-through

def generate_auth_dict(self, uri: str, data: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
def _generate_auth_dict(self, uri: str, data: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""
Generates authentication signature and returns it in a dictionary
:return: a dictionary of request info including the request signature and post data
Expand All @@ -25,7 +53,7 @@ def generate_auth_dict(self, uri: str, data: Optional[Dict[str, str]] = None) ->

# Variables (API method, nonce, and POST data)
api_path: bytes = bytes(uri, 'utf-8')
api_nonce: str = get_tracking_nonce()
api_nonce: str = self.get_tracking_nonce()
api_post: str = "nonce=" + api_nonce

if data is not None:
Expand Down
34 changes: 29 additions & 5 deletions hummingbot/connector/exchange/kraken/kraken_constants.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from enum import Enum
from typing import (
Dict,
Tuple,
)
from hummingbot.core.api_throttler.data_types import RateLimit, LinkedLimitWeightPair
from typing import Dict, Tuple

from hummingbot.core.api_throttler.data_types import LinkedLimitWeightPair, RateLimit
from hummingbot.core.data_type.in_flight_order import OrderState

DEFAULT_DOMAIN = "kraken"
MAX_ORDER_ID_LEN = 32
HBOT_ORDER_ID_PREFIX = "HBOT"

MAX_ID_BIT_COUNT = 31


class KrakenAPITier(Enum):
Expand Down Expand Up @@ -48,10 +53,29 @@ class KrakenAPITier(Enum):
BALANCE_PATH_URL = "/0/private/Balance"
OPEN_ORDERS_PATH_URL = "/0/private/OpenOrders"
QUERY_ORDERS_PATH_URL = "/0/private/QueryOrders"
QUERY_TRADES_PATH_URL = "/0/private/QueryTrades"


UNKNOWN_ORDER_MESSAGE = "Unknown order"
# Order States
ORDER_STATE = {
"pending": OrderState.OPEN,
"open": OrderState.OPEN,
"closed": OrderState.FILLED,
"canceled": OrderState.CANCELED,
"expired": OrderState.FAILED,
}
ORDER_NOT_EXIST_ERROR_CODE = "Error fetching status update for the order"

WS_URL = "wss://ws.kraken.com"
WS_AUTH_URL = "wss://ws-auth.kraken.com/"

DIFF_EVENT_TYPE = "book"
TRADE_EVENT_TYPE = "trade"
PING_TIMEOUT = 10
USER_TRADES_ENDPOINT_NAME = "ownTrades"
USER_ORDERS_ENDPOINT_NAME = "openOrders"

PUBLIC_ENDPOINT_LIMIT_ID = "PublicEndpointLimitID"
PUBLIC_ENDPOINT_LIMIT = 1
PUBLIC_ENDPOINT_LIMIT_INTERVAL = 1
Expand Down
43 changes: 0 additions & 43 deletions hummingbot/connector/exchange/kraken/kraken_exchange.pxd

This file was deleted.

Loading

0 comments on commit a24cc94

Please sign in to comment.