Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions hathor/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,27 @@ def subscribe_address(self, connection: HathorAdminWebsocketProtocol, address: s
if self.max_subs_addrs_conn is not None and len(subs) >= self.max_subs_addrs_conn:
return False, f'Reached maximum number of subscribed addresses ({self.max_subs_addrs_conn}).'

elif self.max_subs_addrs_empty is not None and (
self.address_index and _count_empty(subs, self.address_index) >= self.max_subs_addrs_empty
):
empty_addresses: set[str] = connection.empty_addresses
if (
self.max_subs_addrs_empty is not None
and self.address_index
and len(empty_addresses) >= self.max_subs_addrs_empty
and self._update_and_count_empty(empty_addresses) >= self.max_subs_addrs_empty
):
return False, f'Reached maximum number of subscribed empty addresses ({self.max_subs_addrs_empty}).'

self.address_connections[address].add(connection)
connection.subscribed_to.add(address)
if self.address_index and self.address_index.is_address_empty(address):
connection.empty_addresses.add(address)
return True, ''

def _handle_unsubscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for unsubscribing from an address, also removes address connection set if it ends up empty."""
addr = message['address']
if addr in self.address_connections and connection in self.address_connections[addr]:
connection.subscribed_to.remove(addr)
connection.empty_addresses.discard(addr)
self._remove_connection_from_address_dict(connection, addr)
# Reply back to the client
payload = json_dumpb({'type': 'unsubscribe_address', 'success': True})
Expand All @@ -372,7 +379,13 @@ def on_client_close(self, connection: HathorAdminWebsocketProtocol) -> None:
for address in connection.subscribed_to:
self._remove_connection_from_address_dict(connection, address)

def _update_and_count_empty(self, empty_addresses: set[str]) -> int:
"""Removes non-empty addresses from the given set and returns the updated count."""
assert self.address_index is not None
to_remove = set()
for addr in empty_addresses:
if not self.address_index.is_address_empty(addr):
to_remove.add(addr)

def _count_empty(addresses: set[str], address_index: AddressIndex) -> int:
""" Count how many of the addresses given are empty (have no outputs)."""
return sum(1 for addr in addresses if address_index.is_address_empty(addr))
empty_addresses.difference_update(to_remove)
return len(empty_addresses)
1 change: 1 addition & 0 deletions hathor/websocket/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self,
super().__init__()

self.subscribed_to: set[str] = set()
self.empty_addresses: set[str] = set()

# Enable/disable history streaming for this connection.
self.is_history_streaming_enabled = is_history_streaming_enabled
Expand Down
Loading