Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion hathor/builder/resources_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,11 @@ def create_resources(self) -> server.Site:

# Websocket resource
assert self.manager.tx_storage.indexes is not None
ws_factory = HathorAdminWebsocketFactory(metrics=self.manager.metrics,
ws_factory = HathorAdminWebsocketFactory(manager=self.manager,
metrics=self.manager.metrics,
address_index=self.manager.tx_storage.indexes.addresses)
if self._args.disable_ws_history_streaming:
ws_factory.disable_history_streaming()
ws_factory.start()
root.putChild(b'ws', WebSocketResource(ws_factory))

Expand Down
2 changes: 2 additions & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ def create_parser(cls) -> ArgumentParser:
help='Launch embedded IPython kernel for remote debugging')
parser.add_argument('--log-vertex-bytes', action='store_true',
help='Log tx bytes for debugging')
parser.add_argument('--disable-ws-history-streaming', action='store_true',
help='Disable websocket history streaming API')
return parser

def prepare(self, *, register_resources: bool = True) -> None:
Expand Down
1 change: 1 addition & 0 deletions hathor/cli/run_node_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow):
x_ipython_kernel: bool
nano_testnet: bool
log_vertex_bytes: bool
disable_ws_history_streaming: bool
4 changes: 4 additions & 0 deletions hathor/wallet/hd_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def generate_new_key(self, index):
new_key = self.chain_key.subkey(index)
self._key_generated(new_key, index)

def get_xpub(self) -> str:
"""Return wallet xpub after derivation."""
return self.chain_key.as_text(as_private=False)

def _key_generated(self, key, index):
""" Add generated key to self.keys and set last_generated_index

Expand Down
27 changes: 27 additions & 0 deletions hathor/websocket/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from hathor.exception import HathorError


class InvalidXPub(HathorError):
"""Raised when an invalid xpub is provided."""


class LimitExceeded(HathorError):
"""Raised when a limit is exceeded."""


class InvalidAddress(HathorError):
"""Raised when an invalid address is provided."""
73 changes: 38 additions & 35 deletions hathor/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from collections import defaultdict, deque
from typing import Any, Optional, Union
from typing import Any, Optional

from autobahn.exception import Disconnected
from autobahn.twisted.websocket import WebSocketServerFactory
Expand All @@ -22,11 +22,12 @@

from hathor.conf import HathorSettings
from hathor.indexes import AddressIndex
from hathor.manager import HathorManager
from hathor.metrics import Metrics
from hathor.p2p.rate_limiter import RateLimiter
from hathor.pubsub import EventArguments, HathorEvents
from hathor.reactor import get_global_reactor
from hathor.util import json_dumpb, json_loadb, json_loads
from hathor.util import json_dumpb
from hathor.websocket.protocol import HathorAdminWebsocketProtocol

settings = HathorSettings()
Expand Down Expand Up @@ -83,18 +84,25 @@ class HathorAdminWebsocketFactory(WebSocketServerFactory):
max_subs_addrs_empty: Optional[int] = settings.WS_MAX_SUBS_ADDRS_EMPTY

def buildProtocol(self, addr):
return self.protocol(self)
return self.protocol(self, is_history_streaming_enabled=self.is_history_streaming_enabled)

def __init__(self, metrics: Optional[Metrics] = None, address_index: Optional[AddressIndex] = None):
def __init__(self,
manager: HathorManager,
metrics: Optional[Metrics] = None,
address_index: Optional[AddressIndex] = None):
"""
:param metrics: If not given, a new one is created.
:type metrics: :py:class:`hathor.metrics.Metrics`
"""
self.manager = manager
self.reactor = get_global_reactor()
# Opened websocket connections so I can broadcast messages later
# It contains only connections that have finished handshaking.
self.connections: set[HathorAdminWebsocketProtocol] = set()

# Enable/disable history streaming over the websocket connection.
self.is_history_streaming_enabled: bool = True

# Websocket connection for each address
self.address_connections: defaultdict[str, set[HathorAdminWebsocketProtocol]] = defaultdict(set)
super().__init__()
Expand Down Expand Up @@ -129,6 +137,12 @@ def stop(self):
self._lc_send_metrics.stop()
self.is_running = False

def disable_history_streaming(self) -> None:
"""Disable history streaming for all connections."""
self.is_history_streaming_enabled = False
for conn in self.connections:
self.disable_history_streaming()

def _setup_rate_limit(self):
""" Set the limit of the RateLimiter and start the buffer deques with BUFFER_SIZE
"""
Expand Down Expand Up @@ -300,44 +314,33 @@ def process_deque(self, data_type):
data_type=data_type)
break

def handle_message(self, connection: HathorAdminWebsocketProtocol, data: Union[bytes, str]) -> None:
""" General message handler, detects type and deletages to specific handler."""
if isinstance(data, bytes):
message = json_loadb(data)
else:
message = json_loads(data)
# we only handle ping messages for now
if message['type'] == 'ping':
self._handle_ping(connection, message)
elif message['type'] == 'subscribe_address':
self._handle_subscribe_address(connection, message)
elif message['type'] == 'unsubscribe_address':
self._handle_unsubscribe_address(connection, message)

def _handle_ping(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for ping message, should respond with a simple {"type": "pong"}"""
payload = json_dumpb({'type': 'pong'})
connection.sendMessage(payload, False)

def _handle_subscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for subscription to an address, consideirs subscription limits."""
addr: str = message['address']
address: str = message['address']
success, errmsg = self.subscribe_address(connection, address)
response = {
'type': 'subscribe_address',
'address': address,
'success': success,
}
if not success:
response['message'] = errmsg
connection.sendMessage(json_dumpb(response), False)

def subscribe_address(self, connection: HathorAdminWebsocketProtocol, address: str) -> tuple[bool, str]:
"""Subscribe an address to send real time updates to a websocket connection."""
subs: set[str] = connection.subscribed_to
if self.max_subs_addrs_conn is not None and len(subs) >= self.max_subs_addrs_conn:
payload = json_dumpb({'message': 'Reached maximum number of subscribed '
f'addresses ({self.max_subs_addrs_conn}).',
'type': 'subscribe_address', 'success': False})
return False, f'Reached maximum number of subscribed addresses ({self.max_subs_addrs_conn}).'

elif self.max_subs_addrs_empty is not None and (
self.address_index and _count_empty(subs, self.address_index) >= self.max_subs_addrs_empty
):
payload = json_dumpb({'message': 'Reached maximum number of subscribed '
f'addresses without output ({self.max_subs_addrs_empty}).',
'type': 'subscribe_address', 'success': False})
else:
self.address_connections[addr].add(connection)
connection.subscribed_to.add(addr)
payload = json_dumpb({'type': 'subscribe_address', 'success': True})
connection.sendMessage(payload, False)
return False, f'Reached maximum number of subscribed empty addresses ({self.max_subs_addrs_empty}).'

self.address_connections[address].add(connection)
connection.subscribed_to.add(address)
return True, ''

def _handle_unsubscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for unsubscribing from an address, also removes address connection set if it ends up empty."""
Expand Down
151 changes: 151 additions & 0 deletions hathor/websocket/iterators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
from collections.abc import AsyncIterable
from dataclasses import dataclass
from typing import AsyncIterator, Iterator, TypeAlias

from twisted.internet.defer import Deferred

from hathor.manager import HathorManager
from hathor.transaction import BaseTransaction
from hathor.types import AddressB58
from hathor.websocket.exception import InvalidAddress, InvalidXPub, LimitExceeded


@dataclass(frozen=True, slots=True)
class AddressItem:
index: int
address: AddressB58


@dataclass(frozen=True, slots=True)
class VertexItem:
vertex: BaseTransaction


class ManualAddressSequencer(AsyncIterable[AddressItem]):
"""An async iterable that yields addresses from a list. More addresses
can be added while the iterator is being consumed.
"""

ADDRESS_SIZE: int = 34
MAX_PENDING_ADDRESSES_SIZE: int = 5_000

def __init__(self) -> None:
self.max_pending_addresses_size: int = self.MAX_PENDING_ADDRESSES_SIZE
self.pending_addresses: deque[AddressItem] = deque()
self.await_items: Deferred | None = None

# Flag to mark when all addresses have been received so the iterator
# can stop yielding after the pending list of addresses is empty.
self._stop = False

def _resume_iter(self) -> None:
"""Resume yield addresses."""
if self.await_items is None:
return
if not self.await_items.called:
self.await_items.callback(None)

def add_addresses(self, addresses: list[AddressItem], last: bool) -> None:
"""Add more addresses to be yielded. If `last` is true, the iterator
will stop when the pending list of items gets empty."""
if len(self.pending_addresses) + len(addresses) > self.max_pending_addresses_size:
raise LimitExceeded

# Validate addresses.
for item in addresses:
if len(item.address) != self.ADDRESS_SIZE:
raise InvalidAddress(item)

self.pending_addresses.extend(addresses)
if last:
self._stop = True
self._resume_iter()

def __aiter__(self) -> AsyncIterator[AddressItem]:
"""Return an async iterator."""
return self._async_iter()

async def _async_iter(self) -> AsyncIterator[AddressItem]:
"""Internal method that implements the async iterator."""
while True:
while self.pending_addresses:
item = self.pending_addresses.popleft()
yield item

if self._stop:
break

self.await_items = Deferred()
await self.await_items


def iter_xpub_addresses(xpub_str: str, *, first_index: int = 0) -> Iterator[AddressItem]:
"""An iterator that yields addresses derived from an xpub."""
from pycoin.networks.registry import network_for_netcode

from hathor.wallet.hd_wallet import _register_pycoin_networks
_register_pycoin_networks()
network = network_for_netcode('htr')

xpub = network.parse.bip32(xpub_str)
if xpub is None:
raise InvalidXPub(xpub_str)

idx = first_index
while True:
key = xpub.subkey(idx)
yield AddressItem(idx, AddressB58(key.address()))
idx += 1


async def aiter_xpub_addresses(xpub: str, *, first_index: int = 0) -> AsyncIterator[AddressItem]:
"""An async iterator that yields addresses derived from an xpub."""
it = iter_xpub_addresses(xpub, first_index=first_index)
for item in it:
yield item


AddressSearch: TypeAlias = AsyncIterator[AddressItem | VertexItem]


async def gap_limit_search(
manager: HathorManager,
address_iter: AsyncIterable[AddressItem],
gap_limit: int
) -> AddressSearch:
"""An async iterator that yields addresses and vertices, stopping when the gap limit is reached.
"""
assert manager.tx_storage.indexes is not None
assert manager.tx_storage.indexes.addresses is not None
addresses_index = manager.tx_storage.indexes.addresses
empty_addresses_counter = 0
async for item in address_iter:
yield item # AddressItem

vertex_counter = 0
for vertex_id in addresses_index.get_sorted_from_address(item.address):
tx = manager.tx_storage.get_transaction(vertex_id)
yield VertexItem(tx)
vertex_counter += 1

if vertex_counter == 0:
empty_addresses_counter += 1
if empty_addresses_counter >= gap_limit:
break
else:
empty_addresses_counter = 0
Loading