From 6f5ec7de15de60150670f317da6057d3cd12970f Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Mon, 8 Aug 2022 12:06:59 -0400 Subject: [PATCH 01/13] feat: Implement barebones GW ratelimiter, implement resume url usage. This also refactors some attributes to utilise the ratelimiter and more importantly, the heartbeat loop. (It still works the same!) --- interactions/api/gateway/__init__.py | 1 + interactions/api/gateway/client.py | 73 ++++++++++++++++++++------- interactions/api/gateway/ratelimit.py | 61 ++++++++++++++++++++++ 3 files changed, 117 insertions(+), 18 deletions(-) create mode 100644 interactions/api/gateway/ratelimit.py diff --git a/interactions/api/gateway/__init__.py b/interactions/api/gateway/__init__.py index 3f0423487..903f17955 100644 --- a/interactions/api/gateway/__init__.py +++ b/interactions/api/gateway/__init__.py @@ -6,3 +6,4 @@ """ from .client import * # noqa: F401 F403 from .heartbeat import * # noqa: F401 F403 +from .ratelimit import * # noqa: F401 F403 diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index 667092fe6..f5132aa98 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -6,11 +6,12 @@ from asyncio import ( Event, Task, + TimeoutError, ensure_future, get_event_loop, get_running_loop, new_event_loop, - sleep, + wait_for, ) from sys import platform, version_info from time import perf_counter @@ -33,6 +34,7 @@ from ..models.misc import Snowflake from ..models.presence import ClientPresence from .heartbeat import _Heartbeat +from .ratelimit import WSRateLimit if TYPE_CHECKING: from ...client.context import _Context @@ -47,8 +49,13 @@ class WebSocketClient: """ A class representing the client's connection to the Gateway via. WebSocket. + .. note :: + The ``__heartbeat_event`` Event object is different from the one built in to the Heartbeater object. + The latter is used to trace heartbeat acknowledgement. + :ivar AbstractEventLoop _loop: The asynchronous event loop. :ivar Listener _dispatch: The built-in event dispatcher. + :ivar WSRateLimit _ratelimiter: The websocket ratelimiter object. :ivar HTTPClient _http: The user-facing HTTP client. :ivar ClientWebSocketResponse _client: The WebSocket data of the connection. :ivar bool _closed: Whether the connection has been closed or not. @@ -56,6 +63,7 @@ class WebSocketClient: :ivar Intents _intents: The gateway intents used for connection. :ivar dict _ready: The contents of the application returned when ready. :ivar _Heartbeat __heartbeater: The context state of a "heartbeat" made to the Gateway. + :ivar Event __heartbeat_event: The state of the overall heartbeat process. :ivar Optional[List[Tuple[int]]] __shard: The shards used during connection. :ivar Optional[ClientPresence] __presence: The presence used in connection. :ivar Event ready: The ready state of the client as an ``asyncio.Event``. @@ -65,12 +73,13 @@ class WebSocketClient: :ivar Optional[int] sequence: The sequence identifier of the ongoing session. :ivar float _last_send: The latest time of the last send_packet function call since connection creation, in seconds. :ivar float _last_ack: The latest time of the last ``HEARTBEAT_ACK`` event since connection creation, in seconds. - :ivar float latency: The latency of the connection, in seconds. + :ivar Optional[str] resume_url: The Websocket ratelimit URL for resuming connections, if any. """ __slots__ = ( "_loop", "_dispatch", + "_ratelimiter", "_http", "_client", "_closed", @@ -81,13 +90,14 @@ class WebSocketClient: "__shard", "__presence", "__task", + "__heartbeat_event", "__started", "session_id", "sequence", "ready", "_last_send", "_last_ack", - "latency", + "resume_url", ) def __init__( @@ -112,8 +122,15 @@ def __init__( except RuntimeError: self._loop = new_event_loop() self._dispatch: Listener = Listener() + self._ratelimiter = ( + WSRateLimit(loop=self._loop) if version_info < (3, 10) else WSRateLimit() + ) + self.__heartbeater: _Heartbeat = _Heartbeat( + loop=self._loop if version_info < (3, 10) else None + ) self._http: HTTPClient = HTTPClient(token) self._client: Optional["ClientWebSocketResponse"] = None + self._closed: bool = False self._options: dict = { "max_msg_size": 1024**2, @@ -121,36 +138,55 @@ def __init__( "autoclose": False, "compress": 0, } + self._intents: Intents = intents - self.__heartbeater: _Heartbeat = _Heartbeat( - loop=self._loop if version_info < (3, 10) else None - ) self.__shard: Optional[List[Tuple[int]]] = None self.__presence: Optional[ClientPresence] = None + self.__task: Optional[Task] = None + self.__heartbeat_event = Event(loop=self._loop) if version_info < (3, 10) else Event() self.__started: bool = False + self.session_id: Optional[str] = None if session_id is MISSING else session_id self.sequence: Optional[str] = None if sequence is MISSING else sequence self.ready: Event = Event(loop=self._loop) if version_info < (3, 10) else Event() self._last_send: float = perf_counter() self._last_ack: float = perf_counter() - self.latency: float = float("nan") # noqa: F821 # self.latency has to be noqa, this is valid in python but not in Flake8. + self.resume_url: Optional[str] = None + + @property + def latency(self) -> float: + """ + The latency of the connection, in seconds. + """ + return self._last_ack - self._last_send async def _manage_heartbeat(self) -> None: """Manages the heartbeat loop.""" - while True: + log.debug(f"Sending heartbeat every {self.__heartbeater.delay} seconds...") + while not self.__heartbeat_event.is_set(): + if self._closed: await self.__restart() - if self.__heartbeater.event.is_set(): - await self.__heartbeat() - self.__heartbeater.event.clear() - await sleep(self.__heartbeater.delay / 1000) - else: + + if not self.__heartbeater.event.is_set(): log.debug("HEARTBEAT_ACK missing, reconnecting...") - await self.__restart() - break + await self.__restart() # resume here. + + self.__heartbeater.event.clear() + await self.__heartbeat() + + try: + # wait for next iteration, accounting for latency + await wait_for( + self.__heartbeat_event.wait(), timeout=self.__heartbeater.delay / 1000 + ) + except TimeoutError: + continue # Then we can check heartbeat ack this way and then like it autorestarts. + else: + return # break loop because something went wrong. async def __restart(self) -> None: """Restart the client's connection and heartbeat with the Gateway.""" @@ -177,7 +213,7 @@ async def _establish_connection( self.__heartbeater.delay = 0.0 self._closed = False self._options["headers"] = {"User-Agent": self._http._req._headers["User-Agent"]} - url = await self._http.get_gateway() + url = self.resume_url if self.resume_url else await self._http.get_gateway() async with self._http._req._session.ws_connect(url, **self._options) as self._client: self._closed = self._client.closed @@ -241,7 +277,6 @@ async def _handle_connection( self._last_ack = perf_counter() log.debug("HEARTBEAT_ACK") self.__heartbeater.event.set() - self.latency = self._last_ack - self._last_send if op in (OpCodeType.INVALIDATE_SESSION, OpCodeType.RECONNECT): log.debug("INVALID_SESSION/RECONNECT") @@ -261,6 +296,7 @@ async def _handle_connection( self.session_id = data["session_id"] self.sequence = stream["s"] self._dispatch.dispatch("on_ready") + self.resume_url = data["resume_gateway_url"] if not self.__started: self.__started = True self._dispatch.dispatch("on_start") @@ -680,9 +716,10 @@ async def _send_packet(self, data: Dict[str, Any]) -> None: :param data: The data to send to the Gateway. :type data: Dict[str, Any] """ - self._last_send = perf_counter() _data = dumps(data) if isinstance(data, dict) else data packet: str = _data.decode("utf-8") if isinstance(_data, bytes) else _data + await self._ratelimiter.block() + self._last_send = perf_counter() await self._client.send_str(packet) log.debug(packet) diff --git a/interactions/api/gateway/ratelimit.py b/interactions/api/gateway/ratelimit.py new file mode 100644 index 000000000..de58c6fb3 --- /dev/null +++ b/interactions/api/gateway/ratelimit.py @@ -0,0 +1,61 @@ +import asyncio +import logging +from sys import version_info +from time import time +from typing import Optional + +log = logging.getLogger("gateway.ratelimit") + + +class WSRateLimit: + """ + A class that controls Gateway ratelimits using locking and a timer. + + .. note :: + While the docs state that the Gateway ratelimits are 120/60 (120 requests per 60 seconds), + this ratelimit offsets to 110 instead of 120 for room. + + :ivar Lock lock: The gateway Lock object. + :ivar int max: The upper limit of the ratelimit. Defaults to `110` seconds. + :ivar int remaining: How many requests are left per ``per_second``. This is automatically decremented and reset. + :ivar float current_limit: When this cooldown session began. This is defined automatically. + :ivar float per_second: A constant denoting how many requests can be done per unit of seconds. (i.e., per 60 seconds, per 45, etc.) + """ + + def __init__(self, loop=Optional[asyncio.AbstractEventLoop]): + self.lock = asyncio.Lock(loop=loop) if version_info < (3, 10) else asyncio.Lock() + # To conserve timings, we need to do 110/60 + + self.max = self.remaining = 110 + self.per_second = 60.0 + self.current_limit = 0.0 + + def is_ratelimited(self): + current = time() + if current > self.current_limit + self.per_second: + return False + return self.remaining == 0 + + def get_delay(self): + current = time() + + if current > self.current_limit + self.per_second: + self.remaining = self.max + + if self.remaining == self.max: + self.current_limit = current + + if self.remaining == 0: + return self.per_second - (current - self.current_limit) + + self.remaining -= 1 + if self.remaining == 0: + self.current_limit = current + + return 0.0 + + async def block(self): + async with self.lock: + if delta := self.get_delay(): + log.warning(f"We are ratelimited. Please wait {delta} seconds...") + await asyncio.sleep(delta) From 73c13e11d25b59bd0c27391e4d094362c8da4a11 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Tue, 9 Aug 2022 20:49:21 -0400 Subject: [PATCH 02/13] feat: Refactor variable switching, optimise seq definitions, written primitive restart command. --- interactions/api/gateway/client.py | 74 ++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index f5132aa98..31c24c06f 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -5,6 +5,7 @@ from asyncio import ( Event, + Lock, Task, TimeoutError, ensure_future, @@ -74,6 +75,7 @@ class WebSocketClient: :ivar float _last_send: The latest time of the last send_packet function call since connection creation, in seconds. :ivar float _last_ack: The latest time of the last ``HEARTBEAT_ACK`` event since connection creation, in seconds. :ivar Optional[str] resume_url: The Websocket ratelimit URL for resuming connections, if any. + :ivar Lock reconnect_lock: The lock used for reconnecting the client. """ __slots__ = ( @@ -83,6 +85,7 @@ class WebSocketClient: "_http", "_client", "_closed", + "__closed", # placeholder to work with variables atm. its event variant of "_closed" "_options", "_intents", "_ready", @@ -98,6 +101,7 @@ class WebSocketClient: "_last_send", "_last_ack", "resume_url", + "reconnect_lock", ) def __init__( @@ -132,6 +136,7 @@ def __init__( self._client: Optional["ClientWebSocketResponse"] = None self._closed: bool = False + self.__closed: Event = Event(loop=self._loop) if version_info < (3, 10) else Event() self._options: dict = { "max_msg_size": 1024**2, "timeout": 60, @@ -153,8 +158,9 @@ def __init__( self._last_send: float = perf_counter() self._last_ack: float = perf_counter() - # self.latency has to be noqa, this is valid in python but not in Flake8. + self.resume_url: Optional[str] = None + self.reconnect_lock = Lock(loop=self._loop) if version_info < (3, 10) else Lock() @property def latency(self) -> float: @@ -168,9 +174,6 @@ async def _manage_heartbeat(self) -> None: log.debug(f"Sending heartbeat every {self.__heartbeater.delay} seconds...") while not self.__heartbeat_event.is_set(): - if self._closed: - await self.__restart() - if not self.__heartbeater.event.is_set(): log.debug("HEARTBEAT_ACK missing, reconnecting...") await self.__restart() # resume here. @@ -255,6 +258,10 @@ async def _handle_connection( event: Optional[str] = stream.get("t") data: Optional[Dict[str, Any]] = stream.get("d") + seq: Optional[str] = stream.get("s") + if seq: + self.sequence = seq + if op != OpCodeType.DISPATCH: log.debug(data) @@ -294,7 +301,6 @@ async def _handle_connection( elif event == "READY": self._ready = data self.session_id = data["session_id"] - self.sequence = stream["s"] self._dispatch.dispatch("on_ready") self.resume_url = data["resume_gateway_url"] if not self.__started: @@ -685,6 +691,30 @@ def __option_type_context(self, context: "_Context", type: int) -> dict: async def restart(self) -> None: await self.__restart() + async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: + """ + Restart the client's connection and heartbeat with the Gateway. + """ + + async with self.reconnect_lock: + self.__closed.clear() + + if self._client is not None: + await self._client.close(code=code) + + self._client = None + + if not to_resume: + self.session_id = None + url = self._http.get_gateway() + else: + url = self.resume_url + + self._client = await self._http._req._session.ws_connect(url, **self._options) + + self.__closed.set() + self.__heartbeat_event.set() + @property async def __receive_packet_stream(self) -> Optional[Union[Dict[str, Any], WSMessage]]: """ @@ -709,6 +739,40 @@ async def __receive_packet_stream(self) -> Optional[Union[Dict[str, Any], WSMess return loads(packet.data) if packet and isinstance(packet.data, str) else None + async def __receive_packet(self) -> Optional[Dict[str, Any]]: + """ + Receives a stream of packets sent from the Gateway in an async process. + + :return: The packet stream. + :rtype: Optional[Dict[str, Any]] + """ + + while True: + packet: WSMessage = await self._client.receive() + + if packet.type == WSMsgType.CLOSE: + log.debug(f"Disconnecting from gateway = {packet.data}::{packet.extra}") + + if packet.data >= 4000: + # This means that the error code is 4000+, which may signify Discord-provided error codes. + raise LibraryException(packet.data) # Works because gateway!! + # We need to work on __aexit__ / __aentry__ though, since we're not using the ws request manager. + + await self._reconnect(packet.data != 1000, packet.data) + + elif packet.type == WSMsgType.CLOSED: + # We need to wait/reconnect depending about other event holders. + + if not self.__closed.is_set(): + await self.__closed.wait() + else: + await self._reconnect(True) + + elif packet.type == WSMsgType.CLOSING: + await self.__closed.wait() + + return loads(packet.data) if packet and isinstance(packet.data, str) else None + async def _send_packet(self, data: Dict[str, Any]) -> None: """ Sends a packet to the Gateway. From 626732356fe89bf792a562790cd0c73c390c1fe1 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 13:02:34 -0400 Subject: [PATCH 03/13] feat!: Cache websocket url, completely redo WS client process/instantiation and client login process, implement ratelimiter to GW process --- interactions/api/gateway/client.py | 246 ++++++++++++++++++++++---- interactions/api/gateway/ratelimit.py | 8 +- interactions/client/bot.py | 28 ++- 3 files changed, 244 insertions(+), 38 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index 31c24c06f..7a5365e43 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -4,14 +4,16 @@ from json import dumps, loads from asyncio import ( + FIRST_COMPLETED, Event, Lock, Task, TimeoutError, - ensure_future, + create_task, get_event_loop, get_running_loop, new_event_loop, + wait, wait_for, ) from sys import platform, version_info @@ -101,7 +103,10 @@ class WebSocketClient: "_last_send", "_last_ack", "resume_url", + "ws_url", "reconnect_lock", + "_closing_lock", # document closing for gateway + "__stopping", ) def __init__( @@ -142,6 +147,7 @@ def __init__( "timeout": 60, "autoclose": False, "compress": 0, + "headers": {"User-Agent": self._http._req._headers["User-Agent"]}, } self._intents: Intents = intents @@ -160,8 +166,13 @@ def __init__( self._last_ack: float = perf_counter() self.resume_url: Optional[str] = None + self.ws_url: Optional[str] = None self.reconnect_lock = Lock(loop=self._loop) if version_info < (3, 10) else Lock() + self._closing_lock = Event(loop=self._loop) if version_info < (3, 10) else Event() + + self.__stopping: Optional[Task] = None + @property def latency(self) -> float: """ @@ -169,14 +180,31 @@ def latency(self) -> float: """ return self._last_ack - self._last_send + async def run_heartbeat(self) -> None: + """Controls the heartbeat manager. Do note that this shouldn't be executed by outside processes.""" + + if self.__heartbeat_event.is_set(): # resets task of heartbeat event mgr loop + # Because we're hardresetting the process every instance its called, also helps with recursion + self.__heartbeat_event.clear() + + if not self.__heartbeater.event.is_set(): # resets task of heartbeat ack event + self.__heartbeater.event.set() + + try: + await self._manage_heartbeat() + except Exception: + self._closing_lock.set() + log.error("Heartbeater exception: ", exc_info=True) + async def _manage_heartbeat(self) -> None: """Manages the heartbeat loop.""" log.debug(f"Sending heartbeat every {self.__heartbeater.delay} seconds...") while not self.__heartbeat_event.is_set(): + log.debug("Sending heartbeat...") if not self.__heartbeater.event.is_set(): log.debug("HEARTBEAT_ACK missing, reconnecting...") - await self.__restart() # resume here. + await self._reconnect(True) # resume here. self.__heartbeater.event.clear() await self.__heartbeat() @@ -190,6 +218,7 @@ async def _manage_heartbeat(self) -> None: continue # Then we can check heartbeat ack this way and then like it autorestarts. else: return # break loop because something went wrong. + log.debug("Somehow it reaches here (if in process please fix)") async def __restart(self) -> None: """Restart the client's connection and heartbeat with the Gateway.""" @@ -238,6 +267,97 @@ async def _establish_connection( await self._handle_connection(stream, shard, presence) + async def run(self) -> None: + """ + Handles the client's connection with the Gateway. + """ + + url = await self._http.get_gateway() + self.ws_url = url + self._client = await self._http._req._session.ws_connect(url, **self._options) + + data = await self.__receive_packet(True) # First data is the hello packet. + + self.__heartbeater.delay = data["d"]["heartbeat_interval"] + + self.__task = create_task(self.run_heartbeat()) + + await self.__identify(self.__shard, self.__presence) + + self.__closed.set() + self.__heartbeater.event.set() + + while True: + if self.__stopping is None: + self.__stopping = create_task(self._closing_lock.wait()) + _receive = create_task(self.__receive_packet()) + + done, _ = await wait({self.__stopping, _receive}, return_when=FIRST_COMPLETED) + # Using asyncio.wait to find which one reaches first, when its *closed* or when a message is + # *received* + + if _receive in done: + msg = await _receive + else: + await self.__stopping + _receive.cancel() + return + + await self._handle_stream(msg) + + async def _handle_stream(self, stream: Dict[str, Any]): + """ + Parses raw stream data recieved from the Gateway, including Gateway opcodes and events. + + .. note :: + This should never be called directly. + + :param stream: The packet stream to handle. + :type stream: Dict[str, Any] + """ + op: Optional[int] = stream.get("op") + event: Optional[str] = stream.get("t") + data: Optional[Dict[str, Any]] = stream.get("d") + + seq: Optional[str] = stream.get("s") + if seq: + self.sequence = seq + + if op != OpCodeType.DISPATCH: + log.debug(data) + + if op == OpCodeType.HEARTBEAT: + await self.__heartbeat() + if op == OpCodeType.HEARTBEAT_ACK: + self._last_ack = perf_counter() + log.debug("HEARTBEAT_ACK") + self.__heartbeater.event.set() + + if op == OpCodeType.INVALIDATE_SESSION: + log.debug("INVALID_SESSION") + self.ready.clear() + await self._reconnect(bool(data)) + + if op == OpCodeType.RECONNECT: + log.debug("RECONNECT") + await self._reconnect(True) + + elif event == "RESUMED": + log.debug(f"RESUMED (session_id: {self.session_id}, seq: {self.sequence})") + elif event == "READY": + self.ready.set() + self._dispatch.dispatch("on_ready") + self._ready = data + self.session_id = data["session_id"] + self.resume_url = data["resume_gateway_url"] + if not self.__started: + self.__started = True + self._dispatch.dispatch("on_start") + log.debug(f"READY (session_id: {self.session_id}, seq: {self.sequence})") + else: + log.debug(f"{event}: {str(data).encode('utf-8')}") + self._dispatch_event(event, data) + async def _handle_connection( self, stream: Dict[str, Any], @@ -266,48 +386,58 @@ async def _handle_connection( log.debug(data) if op == OpCodeType.HELLO: - self.__heartbeater.delay = data["heartbeat_interval"] - self.__heartbeater.event.set() - - if self.__task: - self.__task.cancel() # so we can reduce redundant heartbeat bg tasks. - - self.__task = ensure_future(self._manage_heartbeat()) - - if not self.session_id: - await self.__identify(shard, presence) - else: - await self.__resume() + ... + # self.__heartbeater.delay = data["heartbeat_interval"] + # self.__heartbeater.event.set() + # + # if self.__task: + # self.__task.cancel() # so we can reduce redundant heartbeat bg tasks. + # + # self.__task = ensure_future(self._manage_heartbeat()) + # + # if not self.session_id: + # await self.__identify(shard, presence) + # else: + # await self.__resume() if op == OpCodeType.HEARTBEAT: await self.__heartbeat() if op == OpCodeType.HEARTBEAT_ACK: self._last_ack = perf_counter() log.debug("HEARTBEAT_ACK") self.__heartbeater.event.set() - if op in (OpCodeType.INVALIDATE_SESSION, OpCodeType.RECONNECT): - log.debug("INVALID_SESSION/RECONNECT") - - # if data and op != OpCodeType.RECONNECT: - # self.session_id = None - # self.sequence = None - # self._closed = True - if not bool(data) and op == OpCodeType.INVALIDATE_SESSION: - self.session_id = None - - await self.__restart() + if op == OpCodeType.INVALIDATE_SESSION: + log.debug("INVALID_SESSION") + await self._reconnect(bool(data)) + + if op == OpCodeType.RECONNECT: + log.debug("RECONNECT") + await self._reconnect(True) + + # if op in (OpCodeType.INVALIDATE_SESSION, OpCodeType.RECONNECT): + # log.debug("INVALID_SESSION/RECONNECT") + # + # # if data and op != OpCodeType.RECONNECT: + # # self.session_id = None + # # self.sequence = None + # # self._closed = True + # + # if not bool(data) and op == OpCodeType.INVALIDATE_SESSION: + # self.session_id = None + # + # await self.__restart() elif event == "RESUMED": log.debug(f"RESUMED (session_id: {self.session_id}, seq: {self.sequence})") elif event == "READY": + self.ready.set() + self._dispatch.dispatch("on_ready") self._ready = data self.session_id = data["session_id"] - self._dispatch.dispatch("on_ready") self.resume_url = data["resume_gateway_url"] if not self.__started: self.__started = True self._dispatch.dispatch("on_start") log.debug(f"READY (session_id: {self.session_id}, seq: {self.sequence})") - self.ready.set() else: log.debug(f"{event}: {str(data).encode('utf-8')}") self._dispatch_event(event, data) @@ -696,6 +826,8 @@ async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: Restart the client's connection and heartbeat with the Gateway. """ + self._ready.clear() + async with self.reconnect_lock: self.__closed.clear() @@ -705,13 +837,30 @@ async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: self._client = None if not to_resume: - self.session_id = None - url = self._http.get_gateway() + url = self.ws_url if self.ws_url else await self._http.get_gateway() else: + log.info("resuming...") url = self.resume_url self._client = await self._http._req._session.ws_connect(url, **self._options) + data = await self.__receive_packet(True) # First data is the hello packet. + + self.__heartbeater.delay = data["d"]["heartbeat_interval"] + + if self.__task: + log.debug("Heartbeat manager reset...") + self.__task.cancel() + if self.__heartbeat_event.is_set(): + self.__heartbeat_event.clear() # Because we're hardresetting the process + + self.__task = create_task(self.run_heartbeat()) + + if not to_resume: + await self.__identify(self.__shard, self.__presence) + else: + await self.__resume() + self.__closed.set() self.__heartbeat_event.set() @@ -739,7 +888,7 @@ async def __receive_packet_stream(self) -> Optional[Union[Dict[str, Any], WSMess return loads(packet.data) if packet and isinstance(packet.data, str) else None - async def __receive_packet(self) -> Optional[Dict[str, Any]]: + async def __receive_packet(self, ignore_lock: bool = False) -> Optional[Dict[str, Any]]: """ Receives a stream of packets sent from the Gateway in an async process. @@ -748,6 +897,11 @@ async def __receive_packet(self) -> Optional[Dict[str, Any]]: """ while True: + + if not ignore_lock: + # meaning if we're reconnecting or something because of tasks + await self.__closed.wait() + packet: WSMessage = await self._client.receive() if packet.type == WSMsgType.CLOSE: @@ -758,20 +912,44 @@ async def __receive_packet(self) -> Optional[Dict[str, Any]]: raise LibraryException(packet.data) # Works because gateway!! # We need to work on __aexit__ / __aentry__ though, since we're not using the ws request manager. + if ignore_lock: + raise LibraryException( + message="Discord unexpectedly wants to close the WS on receiving by force.", + severity=50, + ) + await self._reconnect(packet.data != 1000, packet.data) + continue elif packet.type == WSMsgType.CLOSED: # We need to wait/reconnect depending about other event holders. + if ignore_lock: + raise LibraryException( + message="Discord unexpectedly closed on receiving by force.", severity=50 + ) + if not self.__closed.is_set(): await self.__closed.wait() + + # Edge case on force reconnecting if we dont else: await self._reconnect(True) elif packet.type == WSMsgType.CLOSING: + + if ignore_lock: + raise LibraryException( + message="Discord unexpectedly closing on receiving by force.", severity=50 + ) + await self.__closed.wait() + continue - return loads(packet.data) if packet and isinstance(packet.data, str) else None + if packet.data is None: + continue # We just loop it over because it could just be processing something. + + return loads(packet.data) if isinstance(packet.data, str) else None async def _send_packet(self, data: Dict[str, Any]) -> None: """ @@ -782,7 +960,11 @@ async def _send_packet(self, data: Dict[str, Any]) -> None: """ _data = dumps(data) if isinstance(data, dict) else data packet: str = _data.decode("utf-8") if isinstance(_data, bytes) else _data - await self._ratelimiter.block() + + if data["op"] != OpCodeType.HEARTBEAT: + # This is because the ratelimiter limits already accounts for this. + await self._ratelimiter.block() + self._last_send = perf_counter() await self._client.send_str(packet) log.debug(packet) diff --git a/interactions/api/gateway/ratelimit.py b/interactions/api/gateway/ratelimit.py index de58c6fb3..79ae67819 100644 --- a/interactions/api/gateway/ratelimit.py +++ b/interactions/api/gateway/ratelimit.py @@ -13,10 +13,10 @@ class WSRateLimit: .. note :: While the docs state that the Gateway ratelimits are 120/60 (120 requests per 60 seconds), - this ratelimit offsets to 110 instead of 120 for room. + this ratelimit offsets to 115 instead of 120 for room. :ivar Lock lock: The gateway Lock object. - :ivar int max: The upper limit of the ratelimit. Defaults to `110` seconds. + :ivar int max: The upper limit of the ratelimit. Defaults to `115` seconds. :ivar int remaining: How many requests are left per ``per_second``. This is automatically decremented and reset. :ivar float current_limit: When this cooldown session began. This is defined automatically. :ivar float per_second: A constant denoting how many requests can be done per unit of seconds. (i.e., per 60 seconds, per 45, etc.) @@ -24,9 +24,9 @@ class WSRateLimit: def __init__(self, loop=Optional[asyncio.AbstractEventLoop]): self.lock = asyncio.Lock(loop=loop) if version_info < (3, 10) else asyncio.Lock() - # To conserve timings, we need to do 110/60 + # To conserve timings, we need to do 115/60 - self.max = self.remaining = 110 + self.max = self.remaining = 115 self.per_second = 60.0 self.current_limit = 0.0 diff --git a/interactions/client/bot.py b/interactions/client/bot.py index 6161b659b..5399a1152 100644 --- a/interactions/client/bot.py +++ b/interactions/client/bot.py @@ -1,6 +1,7 @@ import contextlib import re import sys +import traceback from asyncio import AbstractEventLoop, CancelledError, get_event_loop, iscoroutinefunction from functools import wraps from importlib import import_module @@ -392,8 +393,31 @@ async def _ready(self) -> None: async def _login(self) -> None: """Makes a login with the Discord API.""" - while not self._websocket._closed: - await self._websocket._establish_connection(self._shards, self._presence) + # while not self._websocket._closed: + # await self._websocket._establish_connection(self._shards, self._presence) + + try: + await self._websocket.run() + except Exception as e: + log.error("".join(traceback.format_exception(type(e), e, e.__traceback__))) + + if self._websocket._closing_lock.is_set(): + # signal for closing. + + try: + if self._websocket.__task is not None: + self._websocket.__heartbeat_event.set() + try: + await self._websocket.__task # Wait for the keep-alive handler to finish so we can discard it gracefully + finally: + self._websocket.__task = None + finally: # then the overall WS client + if self._websocket._client is not None: + # This needs to be properly closed + try: + await self._websocket._client.close(code=1000) + finally: + self._websocket._client = None async def wait_until_ready(self) -> None: """Helper method that waits until the websocket is ready.""" From fe5c9a0dbb3ae6e81815f31096ae746571eb8da1 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 13:19:51 -0400 Subject: [PATCH 04/13] docs, chore: Remove unused code/imports/log prints, optimise docs/slots --- interactions/api/gateway/client.py | 175 ++--------------------------- 1 file changed, 8 insertions(+), 167 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index 7a5365e43..abfaf6f29 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union from aiohttp import ClientWebSocketResponse, WSMessage, WSMsgType -from aiohttp.http import WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE from ...base import get_logger from ...client.enums import InteractionType, OptionType @@ -61,7 +60,7 @@ class WebSocketClient: :ivar WSRateLimit _ratelimiter: The websocket ratelimiter object. :ivar HTTPClient _http: The user-facing HTTP client. :ivar ClientWebSocketResponse _client: The WebSocket data of the connection. - :ivar bool _closed: Whether the connection has been closed or not. + :ivar Event __closed: Whether the connection has been closed or not. :ivar dict _options: The connection options made during connection. :ivar Intents _intents: The gateway intents used for connection. :ivar dict _ready: The contents of the application returned when ready. @@ -70,14 +69,17 @@ class WebSocketClient: :ivar Optional[List[Tuple[int]]] __shard: The shards used during connection. :ivar Optional[ClientPresence] __presence: The presence used in connection. :ivar Event ready: The ready state of the client as an ``asyncio.Event``. - :ivar Task __task: The closing task for ending connections. + :ivar Task __task: The task containing the heartbeat manager process. :ivar bool __started: Whether the client has started. :ivar Optional[str] session_id: The ID of the ongoing session. :ivar Optional[int] sequence: The sequence identifier of the ongoing session. :ivar float _last_send: The latest time of the last send_packet function call since connection creation, in seconds. :ivar float _last_ack: The latest time of the last ``HEARTBEAT_ACK`` event since connection creation, in seconds. :ivar Optional[str] resume_url: The Websocket ratelimit URL for resuming connections, if any. + :ivar Optional[str] ws_url: The Websocket URL for instantiating connections without resuming. :ivar Lock reconnect_lock: The lock used for reconnecting the client. + :ivar Lock _closing_lock: The lock used for closing the client. + :ivar Optional[Task] __stopping: The task containing stopping the client, if any. """ __slots__ = ( @@ -86,7 +88,6 @@ class WebSocketClient: "_ratelimiter", "_http", "_client", - "_closed", "__closed", # placeholder to work with variables atm. its event variant of "_closed" "_options", "_intents", @@ -105,7 +106,7 @@ class WebSocketClient: "resume_url", "ws_url", "reconnect_lock", - "_closing_lock", # document closing for gateway + "_closing_lock", "__stopping", ) @@ -140,7 +141,6 @@ def __init__( self._http: HTTPClient = HTTPClient(token) self._client: Optional["ClientWebSocketResponse"] = None - self._closed: bool = False self.__closed: Event = Event(loop=self._loop) if version_info < (3, 10) else Event() self._options: dict = { "max_msg_size": 1024**2, @@ -218,60 +218,14 @@ async def _manage_heartbeat(self) -> None: continue # Then we can check heartbeat ack this way and then like it autorestarts. else: return # break loop because something went wrong. - log.debug("Somehow it reaches here (if in process please fix)") - - async def __restart(self) -> None: - """Restart the client's connection and heartbeat with the Gateway.""" - if self.__task: - self.__task.cancel() - self._client = None # clear pending waits - self.__heartbeater.event.clear() - await self._establish_connection(self.__shard, self.__presence) - - async def _establish_connection( - self, - shard: Optional[List[Tuple[int]]] = MISSING, - presence: Optional[ClientPresence] = MISSING, - ) -> None: - """ - Establishes a client connection with the Gateway. - - :param shard?: The shards to establish a connection with. Defaults to ``None``. - :type shard?: Optional[List[Tuple[int]]] - :param presence: The presence to carry with. Defaults to ``None``. - :type presence: Optional[ClientPresence] - """ - self._client = None - self.__heartbeater.delay = 0.0 - self._closed = False - self._options["headers"] = {"User-Agent": self._http._req._headers["User-Agent"]} - url = self.resume_url if self.resume_url else await self._http.get_gateway() - - async with self._http._req._session.ws_connect(url, **self._options) as self._client: - self._closed = self._client.closed - - if self._closed: - await self._establish_connection(self.__shard, self.__presence) - - while not self._closed: - stream = await self.__receive_packet_stream - - if stream is None: - continue - if self._client is None or stream == WS_CLOSED_MESSAGE or stream == WSMsgType.CLOSE: - await self._establish_connection(self.__shard, self.__presence) - break - - if self._client.close_code in range(4010, 4014) or self._client.close_code == 4004: - raise LibraryException(self._client.close_code) - - await self._handle_connection(stream, shard, presence) async def run(self) -> None: """ Handles the client's connection with the Gateway. """ + # Credit to NAFF for inspiration for the Gateway logic. + url = await self._http.get_gateway() self.ws_url = url self._client = await self._http._req._session.ws_connect(url, **self._options) @@ -358,90 +312,6 @@ async def _handle_stream(self, stream: Dict[str, Any]): log.debug(f"{event}: {str(data).encode('utf-8')}") self._dispatch_event(event, data) - async def _handle_connection( - self, - stream: Dict[str, Any], - shard: Optional[List[Tuple[int]]] = MISSING, - presence: Optional[ClientPresence] = MISSING, - ) -> None: - """ - Handles the client's connection with the Gateway. - - :param stream: The packet stream to handle. - :type stream: Dict[str, Any] - :param shard?: The shards to establish a connection with. Defaults to ``None``. - :type shard?: Optional[List[Tuple[int]]] - :param presence: The presence to carry with. Defaults to ``None``. - :type presence: Optional[ClientPresence] - """ - op: Optional[int] = stream.get("op") - event: Optional[str] = stream.get("t") - data: Optional[Dict[str, Any]] = stream.get("d") - - seq: Optional[str] = stream.get("s") - if seq: - self.sequence = seq - - if op != OpCodeType.DISPATCH: - log.debug(data) - - if op == OpCodeType.HELLO: - ... - # self.__heartbeater.delay = data["heartbeat_interval"] - # self.__heartbeater.event.set() - # - # if self.__task: - # self.__task.cancel() # so we can reduce redundant heartbeat bg tasks. - # - # self.__task = ensure_future(self._manage_heartbeat()) - # - # if not self.session_id: - # await self.__identify(shard, presence) - # else: - # await self.__resume() - if op == OpCodeType.HEARTBEAT: - await self.__heartbeat() - if op == OpCodeType.HEARTBEAT_ACK: - self._last_ack = perf_counter() - log.debug("HEARTBEAT_ACK") - self.__heartbeater.event.set() - - if op == OpCodeType.INVALIDATE_SESSION: - log.debug("INVALID_SESSION") - await self._reconnect(bool(data)) - - if op == OpCodeType.RECONNECT: - log.debug("RECONNECT") - await self._reconnect(True) - - # if op in (OpCodeType.INVALIDATE_SESSION, OpCodeType.RECONNECT): - # log.debug("INVALID_SESSION/RECONNECT") - # - # # if data and op != OpCodeType.RECONNECT: - # # self.session_id = None - # # self.sequence = None - # # self._closed = True - # - # if not bool(data) and op == OpCodeType.INVALIDATE_SESSION: - # self.session_id = None - # - # await self.__restart() - elif event == "RESUMED": - log.debug(f"RESUMED (session_id: {self.session_id}, seq: {self.sequence})") - elif event == "READY": - self.ready.set() - self._dispatch.dispatch("on_ready") - self._ready = data - self.session_id = data["session_id"] - self.resume_url = data["resume_gateway_url"] - if not self.__started: - self.__started = True - self._dispatch.dispatch("on_start") - log.debug(f"READY (session_id: {self.session_id}, seq: {self.sequence})") - else: - log.debug(f"{event}: {str(data).encode('utf-8')}") - self._dispatch_event(event, data) - async def wait_until_ready(self) -> None: """Waits for the client to become ready according to the Gateway.""" await self.ready.wait() @@ -818,9 +688,6 @@ def __option_type_context(self, context: "_Context", type: int) -> dict: } return _resolved - async def restart(self) -> None: - await self.__restart() - async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: """ Restart the client's connection and heartbeat with the Gateway. @@ -839,7 +706,6 @@ async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: if not to_resume: url = self.ws_url if self.ws_url else await self._http.get_gateway() else: - log.info("resuming...") url = self.resume_url self._client = await self._http._req._session.ws_connect(url, **self._options) @@ -849,7 +715,6 @@ async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: self.__heartbeater.delay = data["d"]["heartbeat_interval"] if self.__task: - log.debug("Heartbeat manager reset...") self.__task.cancel() if self.__heartbeat_event.is_set(): self.__heartbeat_event.clear() # Because we're hardresetting the process @@ -864,30 +729,6 @@ async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: self.__closed.set() self.__heartbeat_event.set() - @property - async def __receive_packet_stream(self) -> Optional[Union[Dict[str, Any], WSMessage]]: - """ - Receives a stream of packets sent from the Gateway. - - :return: The packet stream. - :rtype: Optional[Dict[str, Any]] - """ - - packet: WSMessage = await self._client.receive() - - if packet == WSMsgType.CLOSE: - await self._client.close() - return packet - - elif packet == WS_CLOSED_MESSAGE: - return packet - - elif packet == WS_CLOSING_MESSAGE: - await self._client.close() - return WS_CLOSED_MESSAGE - - return loads(packet.data) if packet and isinstance(packet.data, str) else None - async def __receive_packet(self, ignore_lock: bool = False) -> Optional[Dict[str, Any]]: """ Receives a stream of packets sent from the Gateway in an async process. From 6ce97bbca21bc64d3456294dccd273877c87e32c Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 13:57:19 -0400 Subject: [PATCH 05/13] docs, chore: Remove/refactor unused comments, document and typehint ratelimiter object --- interactions/api/gateway/ratelimit.py | 24 ++++++++++++++++++++---- interactions/client/bot.py | 5 ++--- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/interactions/api/gateway/ratelimit.py b/interactions/api/gateway/ratelimit.py index 79ae67819..5dc4f0d5b 100644 --- a/interactions/api/gateway/ratelimit.py +++ b/interactions/api/gateway/ratelimit.py @@ -22,21 +22,34 @@ class WSRateLimit: :ivar float per_second: A constant denoting how many requests can be done per unit of seconds. (i.e., per 60 seconds, per 45, etc.) """ - def __init__(self, loop=Optional[asyncio.AbstractEventLoop]): + def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self.lock = asyncio.Lock(loop=loop) if version_info < (3, 10) else asyncio.Lock() # To conserve timings, we need to do 115/60 + # Also, credit to d.py for their ratelimiter inspiration self.max = self.remaining = 115 self.per_second = 60.0 self.current_limit = 0.0 - def is_ratelimited(self): + def is_ratelimited(self) -> bool: + """ + A function that's called whenever the websocket ratelimiter is ratelimited. + + :return: Whether it's ratelimited or not. + :rtype: bool + """ current = time() if current > self.current_limit + self.per_second: return False return self.remaining == 0 - def get_delay(self): + def get_delay(self) -> float: + """ + A function that calculates how long we need to wait for ratelimit to pass, if any. + + :return: How long to wait in seconds, if any. Defaults to ``0.0``. + :rtype: float + """ current = time() if current > self.current_limit + self.per_second: @@ -54,7 +67,10 @@ def get_delay(self): return 0.0 - async def block(self): + async def block(self) -> None: + """ + A function that uses the internal Lock to check for rate-limits and cooldown whenever necessary. + """ async with self.lock: if delta := self.get_delay(): log.warning(f"We are ratelimited. Please wait {delta} seconds...") diff --git a/interactions/client/bot.py b/interactions/client/bot.py index 5399a1152..b6ed41cd3 100644 --- a/interactions/client/bot.py +++ b/interactions/client/bot.py @@ -393,8 +393,6 @@ async def _ready(self) -> None: async def _login(self) -> None: """Makes a login with the Discord API.""" - # while not self._websocket._closed: - # await self._websocket._establish_connection(self._shards, self._presence) try: await self._websocket.run() @@ -408,7 +406,8 @@ async def _login(self) -> None: if self._websocket.__task is not None: self._websocket.__heartbeat_event.set() try: - await self._websocket.__task # Wait for the keep-alive handler to finish so we can discard it gracefully + # Wait for the keep-alive handler to finish so we can discard it gracefully + await self._websocket.__task finally: self._websocket.__task = None finally: # then the overall WS client From 8c2ba3d2fd36b69cc20e0722dc07e5fc3a7c224b Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:08:28 -0400 Subject: [PATCH 06/13] docs: Fix misprint on log debug division --- interactions/api/gateway/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index abfaf6f29..ffad2bf15 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -198,7 +198,7 @@ async def run_heartbeat(self) -> None: async def _manage_heartbeat(self) -> None: """Manages the heartbeat loop.""" - log.debug(f"Sending heartbeat every {self.__heartbeater.delay} seconds...") + log.debug(f"Sending heartbeat every {self.__heartbeater.delay / 1000} seconds...") while not self.__heartbeat_event.is_set(): log.debug("Sending heartbeat...") From 7fb6b81f0202b7b020343dddb0f394adddc88577 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:36:36 -0400 Subject: [PATCH 07/13] feat: Implement a _stop() client function. --- interactions/api/gateway/client.py | 2 +- interactions/client/bot.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index ffad2bf15..0858fdc2e 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -807,8 +807,8 @@ async def _send_packet(self, data: Dict[str, Any]) -> None: await self._ratelimiter.block() self._last_send = perf_counter() - await self._client.send_str(packet) log.debug(packet) + await self._client.send_str(packet) async def __identify( self, shard: Optional[List[Tuple[int]]] = None, presence: Optional[ClientPresence] = None diff --git a/interactions/client/bot.py b/interactions/client/bot.py index b6ed41cd3..0879dfb17 100644 --- a/interactions/client/bot.py +++ b/interactions/client/bot.py @@ -1,7 +1,6 @@ import contextlib import re import sys -import traceback from asyncio import AbstractEventLoop, CancelledError, get_event_loop, iscoroutinefunction from functools import wraps from importlib import import_module @@ -391,13 +390,23 @@ async def _ready(self) -> None: log.debug("Client is now ready.") await self._login() + async def _stop(self) -> None: + """Stops the websocket connection gracefully.""" + + log.debug("Shutting down the client....") + self._websocket.ready.clear() # Clears ready state. + self._websocket._closing_lock.set() # Toggles the "ready-to-shutdown" state for the bot. + # And subsequently, the processes will close itself. + + await self._http._req._session.close() # Closes the HTTP session associated with the client. + async def _login(self) -> None: """Makes a login with the Discord API.""" try: await self._websocket.run() except Exception as e: - log.error("".join(traceback.format_exception(type(e), e, e.__traceback__))) + log.exception(f"Websocket have raised an exception, closing because of: {e}") if self._websocket._closing_lock.is_set(): # signal for closing. From 8d9c27527bef0bedf47d5bafd326f511c2159bb3 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:47:50 -0400 Subject: [PATCH 08/13] fix: Supress error code 4001. --- interactions/api/gateway/client.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index 0858fdc2e..bafc6e914 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -748,10 +748,15 @@ async def __receive_packet(self, ignore_lock: bool = False) -> Optional[Dict[str if packet.type == WSMsgType.CLOSE: log.debug(f"Disconnecting from gateway = {packet.data}::{packet.extra}") - if packet.data >= 4000: + if ( + packet.data >= 4000 and packet.data != 4001 + ): # suppress 4001 because of weird presence errors # This means that the error code is 4000+, which may signify Discord-provided error codes. - raise LibraryException(packet.data) # Works because gateway!! - # We need to work on __aexit__ / __aentry__ though, since we're not using the ws request manager. + + # However, we suppress 4001 because of weird presence errors with change_presence + # The payload is correct, and the presence object persists. /shrug + + raise LibraryException(packet.data) if ignore_lock: raise LibraryException( From 7a2fc2756593e7f51bad79fae69bfc40ef755b53 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 15:25:35 -0400 Subject: [PATCH 09/13] fix: Change to int value conversions for payload, readd afk attribute to ClientPresence object, revert suppress 4001 commit. --- interactions/api/gateway/client.py | 14 ++++++-------- interactions/api/models/presence.py | 3 +++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index bafc6e914..5e6193b11 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -748,9 +748,7 @@ async def __receive_packet(self, ignore_lock: bool = False) -> Optional[Dict[str if packet.type == WSMsgType.CLOSE: log.debug(f"Disconnecting from gateway = {packet.data}::{packet.extra}") - if ( - packet.data >= 4000 and packet.data != 4001 - ): # suppress 4001 because of weird presence errors + if packet.data >= 4000: # suppress 4001 because of weird presence errors # This means that the error code is 4000+, which may signify Discord-provided error codes. # However, we suppress 4001 because of weird presence errors with change_presence @@ -807,7 +805,7 @@ async def _send_packet(self, data: Dict[str, Any]) -> None: _data = dumps(data) if isinstance(data, dict) else data packet: str = _data.decode("utf-8") if isinstance(_data, bytes) else _data - if data["op"] != OpCodeType.HEARTBEAT: + if data["op"] != OpCodeType.HEARTBEAT.value: # This is because the ratelimiter limits already accounts for this. await self._ratelimiter.block() @@ -829,7 +827,7 @@ async def __identify( self.__shard = shard self.__presence = presence payload: dict = { - "op": OpCodeType.IDENTIFY, + "op": OpCodeType.IDENTIFY.value, "d": { "token": self._http.token, "intents": self._intents.value, @@ -853,7 +851,7 @@ async def __identify( async def __resume(self) -> None: """Sends a ``RESUME`` packet to the gateway.""" payload: dict = { - "op": OpCodeType.RESUME, + "op": OpCodeType.RESUME.value, "d": {"token": self._http.token, "seq": self.sequence, "session_id": self.session_id}, } log.debug(f"RESUMING: {payload}") @@ -862,7 +860,7 @@ async def __resume(self) -> None: async def __heartbeat(self) -> None: """Sends a ``HEARTBEAT`` packet to the gateway.""" - payload: dict = {"op": OpCodeType.HEARTBEAT, "d": self.sequence} + payload: dict = {"op": OpCodeType.HEARTBEAT.value, "d": self.sequence} await self._send_packet(payload) log.debug("HEARTBEAT") @@ -888,7 +886,7 @@ async def _update_presence(self, presence: ClientPresence) -> None: :param presence: The presence to change the bot to on identify. :type presence: ClientPresence """ - payload: dict = {"op": OpCodeType.PRESENCE, "d": presence._json} + payload: dict = {"op": OpCodeType.PRESENCE.value, "d": presence._json} await self._send_packet(payload) log.debug(f"UPDATE_PRESENCE: {presence._json}") self.__presence = presence diff --git a/interactions/api/models/presence.py b/interactions/api/models/presence.py index 9ca3d1fc5..3a387ad7b 100644 --- a/interactions/api/models/presence.py +++ b/interactions/api/models/presence.py @@ -181,3 +181,6 @@ def __attrs_post_init__(self): # If since is not provided by the developer... self.since = int(time.time() * 1000) if self.status == "idle" else 0 self._json["since"] = self.since + if not self._json.get("afk"): + self.afk = False + self._json["afk"] = False From e09cacae0f261473d471b6cb1418ad3265a025e2 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 19:30:18 -0400 Subject: [PATCH 10/13] docs, refactor: Tweak ratelimit functions to properties, redo ratelimit documentation and usage of logger exceptions --- interactions/api/gateway/client.py | 4 ++-- interactions/api/gateway/ratelimit.py | 16 +++++++++------- interactions/client/bot.py | 4 ++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index 5e6193b11..f968b96de 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -194,7 +194,7 @@ async def run_heartbeat(self) -> None: await self._manage_heartbeat() except Exception: self._closing_lock.set() - log.error("Heartbeater exception: ", exc_info=True) + log.exception("Heartbeater exception.") async def _manage_heartbeat(self) -> None: """Manages the heartbeat loop.""" @@ -690,7 +690,7 @@ def __option_type_context(self, context: "_Context", type: int) -> dict: async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: """ - Restart the client's connection and heartbeat with the Gateway. + Restarts the client's connection and heartbeat with the Gateway. """ self._ready.clear() diff --git a/interactions/api/gateway/ratelimit.py b/interactions/api/gateway/ratelimit.py index 5dc4f0d5b..48ff77051 100644 --- a/interactions/api/gateway/ratelimit.py +++ b/interactions/api/gateway/ratelimit.py @@ -16,7 +16,7 @@ class WSRateLimit: this ratelimit offsets to 115 instead of 120 for room. :ivar Lock lock: The gateway Lock object. - :ivar int max: The upper limit of the ratelimit. Defaults to `115` seconds. + :ivar int max: The upper limit of the ratelimit in seconds. Defaults to `115`. :ivar int remaining: How many requests are left per ``per_second``. This is automatically decremented and reset. :ivar float current_limit: When this cooldown session began. This is defined automatically. :ivar float per_second: A constant denoting how many requests can be done per unit of seconds. (i.e., per 60 seconds, per 45, etc.) @@ -31,11 +31,12 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self.per_second = 60.0 self.current_limit = 0.0 - def is_ratelimited(self) -> bool: + @property + def ratelimited(self) -> bool: """ - A function that's called whenever the websocket ratelimiter is ratelimited. + An attribute that reflects whenever the websocket ratelimiter is rate-limited. - :return: Whether it's ratelimited or not. + :return: Whether it's rate-limited or not. :rtype: bool """ current = time() @@ -43,7 +44,8 @@ def is_ratelimited(self) -> bool: return False return self.remaining == 0 - def get_delay(self) -> float: + @property + def delay(self) -> float: """ A function that calculates how long we need to wait for ratelimit to pass, if any. @@ -72,6 +74,6 @@ async def block(self) -> None: A function that uses the internal Lock to check for rate-limits and cooldown whenever necessary. """ async with self.lock: - if delta := self.get_delay(): - log.warning(f"We are ratelimited. Please wait {delta} seconds...") + if delta := self.delay: + log.warning(f"We are rate-limited. Please wait {round(delta, 2)} seconds...") await asyncio.sleep(delta) diff --git a/interactions/client/bot.py b/interactions/client/bot.py index 0879dfb17..d1c2f3ffd 100644 --- a/interactions/client/bot.py +++ b/interactions/client/bot.py @@ -405,8 +405,8 @@ async def _login(self) -> None: try: await self._websocket.run() - except Exception as e: - log.exception(f"Websocket have raised an exception, closing because of: {e}") + except Exception: + log.exception("Websocket have raised an exception, closing.") if self._websocket._closing_lock.is_set(): # signal for closing. From 9a75a0b3da99d4c3ed3b618fbeffa0996aa2a73f Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Thu, 18 Aug 2022 19:35:30 -0400 Subject: [PATCH 11/13] docs: Tweak ratelimit delay description --- interactions/api/gateway/ratelimit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interactions/api/gateway/ratelimit.py b/interactions/api/gateway/ratelimit.py index 48ff77051..01e30466f 100644 --- a/interactions/api/gateway/ratelimit.py +++ b/interactions/api/gateway/ratelimit.py @@ -47,7 +47,7 @@ def ratelimited(self) -> bool: @property def delay(self) -> float: """ - A function that calculates how long we need to wait for ratelimit to pass, if any. + An attribute that reflects how long we need to wait for ratelimit to pass, if any. :return: How long to wait in seconds, if any. Defaults to ``0.0``. :rtype: float From 06a5787b60bfc0e6e71f021482c01a3cac75f8e4 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Fri, 19 Aug 2022 09:09:29 -0400 Subject: [PATCH 12/13] fix: Fix changing client presence to IDLE without providing activities attribute --- interactions/api/models/presence.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/interactions/api/models/presence.py b/interactions/api/models/presence.py index 3a387ad7b..305774c64 100644 --- a/interactions/api/models/presence.py +++ b/interactions/api/models/presence.py @@ -182,5 +182,6 @@ def __attrs_post_init__(self): self.since = int(time.time() * 1000) if self.status == "idle" else 0 self._json["since"] = self.since if not self._json.get("afk"): - self.afk = False - self._json["afk"] = False + self.afk = self._json["afk"] = False + if not self._json.get("activities"): + self.activities = self._json["activities"] = [] From 1e972a803ac0330492b34df847744fdb3cf331d4 Mon Sep 17 00:00:00 2001 From: DeltaXWizard <33706469+deltaxwizard@users.noreply.github.com> Date: Fri, 19 Aug 2022 09:20:24 -0400 Subject: [PATCH 13/13] refactor!: Rename __task attribute to _task --- interactions/api/gateway/client.py | 14 +++++++------- interactions/client/bot.py | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/interactions/api/gateway/client.py b/interactions/api/gateway/client.py index f968b96de..ef4cfc698 100644 --- a/interactions/api/gateway/client.py +++ b/interactions/api/gateway/client.py @@ -69,7 +69,7 @@ class WebSocketClient: :ivar Optional[List[Tuple[int]]] __shard: The shards used during connection. :ivar Optional[ClientPresence] __presence: The presence used in connection. :ivar Event ready: The ready state of the client as an ``asyncio.Event``. - :ivar Task __task: The task containing the heartbeat manager process. + :ivar Task _task: The task containing the heartbeat manager process. :ivar bool __started: Whether the client has started. :ivar Optional[str] session_id: The ID of the ongoing session. :ivar Optional[int] sequence: The sequence identifier of the ongoing session. @@ -95,7 +95,7 @@ class WebSocketClient: "__heartbeater", "__shard", "__presence", - "__task", + "_task", "__heartbeat_event", "__started", "session_id", @@ -154,7 +154,7 @@ def __init__( self.__shard: Optional[List[Tuple[int]]] = None self.__presence: Optional[ClientPresence] = None - self.__task: Optional[Task] = None + self._task: Optional[Task] = None self.__heartbeat_event = Event(loop=self._loop) if version_info < (3, 10) else Event() self.__started: bool = False @@ -234,7 +234,7 @@ async def run(self) -> None: self.__heartbeater.delay = data["d"]["heartbeat_interval"] - self.__task = create_task(self.run_heartbeat()) + self._task = create_task(self.run_heartbeat()) await self.__identify(self.__shard, self.__presence) @@ -714,12 +714,12 @@ async def _reconnect(self, to_resume: bool, code: Optional[int] = 1012) -> None: self.__heartbeater.delay = data["d"]["heartbeat_interval"] - if self.__task: - self.__task.cancel() + if self._task: + self._task.cancel() if self.__heartbeat_event.is_set(): self.__heartbeat_event.clear() # Because we're hardresetting the process - self.__task = create_task(self.run_heartbeat()) + self._task = create_task(self.run_heartbeat()) if not to_resume: await self.__identify(self.__shard, self.__presence) diff --git a/interactions/client/bot.py b/interactions/client/bot.py index d1c2f3ffd..06079a4b6 100644 --- a/interactions/client/bot.py +++ b/interactions/client/bot.py @@ -412,13 +412,13 @@ async def _login(self) -> None: # signal for closing. try: - if self._websocket.__task is not None: + if self._websocket._task is not None: self._websocket.__heartbeat_event.set() try: # Wait for the keep-alive handler to finish so we can discard it gracefully - await self._websocket.__task + await self._websocket._task finally: - self._websocket.__task = None + self._websocket._task = None finally: # then the overall WS client if self._websocket._client is not None: # This needs to be properly closed