diff --git a/hathor/websocket/factory.py b/hathor/websocket/factory.py index b96dcbdff..212b327eb 100644 --- a/hathor/websocket/factory.py +++ b/hathor/websocket/factory.py @@ -333,13 +333,19 @@ def subscribe_address(self, connection: HathorAdminWebsocketProtocol, address: s if self.max_subs_addrs_conn is not None and len(subs) >= self.max_subs_addrs_conn: return False, f'Reached maximum number of subscribed addresses ({self.max_subs_addrs_conn}).' - elif self.max_subs_addrs_empty is not None and ( - self.address_index and _count_empty(subs, self.address_index) >= self.max_subs_addrs_empty - ): + empty_addresses: set[str] = connection.empty_addresses + if ( + self.max_subs_addrs_empty is not None + and self.address_index + and len(empty_addresses) >= self.max_subs_addrs_empty + and self._update_and_count_empty(empty_addresses) >= self.max_subs_addrs_empty + ): return False, f'Reached maximum number of subscribed empty addresses ({self.max_subs_addrs_empty}).' self.address_connections[address].add(connection) connection.subscribed_to.add(address) + if self.address_index and self.address_index.is_address_empty(address): + connection.empty_addresses.add(address) return True, '' def _handle_unsubscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None: @@ -347,6 +353,7 @@ def _handle_unsubscribe_address(self, connection: HathorAdminWebsocketProtocol, addr = message['address'] if addr in self.address_connections and connection in self.address_connections[addr]: connection.subscribed_to.remove(addr) + connection.empty_addresses.discard(addr) self._remove_connection_from_address_dict(connection, addr) # Reply back to the client payload = json_dumpb({'type': 'unsubscribe_address', 'success': True}) @@ -372,7 +379,13 @@ def on_client_close(self, connection: HathorAdminWebsocketProtocol) -> None: for address in connection.subscribed_to: self._remove_connection_from_address_dict(connection, address) + def _update_and_count_empty(self, empty_addresses: set[str]) -> int: + """Removes non-empty addresses from the given set and returns the updated count.""" + assert self.address_index is not None + to_remove = set() + for addr in empty_addresses: + if not self.address_index.is_address_empty(addr): + to_remove.add(addr) -def _count_empty(addresses: set[str], address_index: AddressIndex) -> int: - """ Count how many of the addresses given are empty (have no outputs).""" - return sum(1 for addr in addresses if address_index.is_address_empty(addr)) + empty_addresses.difference_update(to_remove) + return len(empty_addresses) diff --git a/hathor/websocket/protocol.py b/hathor/websocket/protocol.py index 5f173151e..80c5e2328 100644 --- a/hathor/websocket/protocol.py +++ b/hathor/websocket/protocol.py @@ -56,6 +56,7 @@ def __init__(self, super().__init__() self.subscribed_to: set[str] = set() + self.empty_addresses: set[str] = set() # Enable/disable history streaming for this connection. self.is_history_streaming_enabled = is_history_streaming_enabled