diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index deef7d710..d570efe9f 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -34,6 +34,7 @@ from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager from hathor.manager import HathorManager from hathor.mining.cpu_mining_service import CpuMiningService +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer_id import PeerId from hathor.p2p.utils import discover_hostname, get_genesis_short_hash @@ -389,7 +390,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager: p2p_manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts)) if self._args.bootstrap: - p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap)) + entrypoints = [Entrypoint.parse(desc) for desc in self._args.bootstrap] + p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(entrypoints)) if self._args.x_rocksdb_indexes: self.log.warn('--x-rocksdb-indexes is now the default, no need to specify it') diff --git a/hathor/metrics.py b/hathor/metrics.py index 64ee2bd08..92b73d3ad 100644 --- a/hathor/metrics.py +++ b/hathor/metrics.py @@ -252,7 +252,7 @@ def collect_peer_connection_metrics(self) -> None: continue metric = PeerConnectionMetrics( - connection_string=connection.connection_string if connection.connection_string else "", + connection_string=str(connection.entrypoint) if connection.entrypoint else "", peer_id=connection.peer.id, network=connection.network, received_messages=connection.metrics.received_messages, diff --git a/hathor/p2p/entrypoint.py b/hathor/p2p/entrypoint.py new file mode 100644 index 000000000..7d0f726b4 --- /dev/null +++ b/hathor/p2p/entrypoint.py @@ -0,0 +1,219 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from enum import Enum +from urllib.parse import parse_qs, urlparse + +from twisted.internet.address import IPv4Address, IPv6Address +from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.interfaces import IStreamClientEndpoint +from typing_extensions import Self + +from hathor.reactor import ReactorProtocol as Reactor +from hathor.types import Hash + + +class Protocol(Enum): + TCP = 'tcp' + + +class PeerId(Hash): + pass + + +@dataclass(frozen=True, slots=True) +class Entrypoint: + """Endpoint description (returned from DNS query, or received from the p2p network) may contain a peer-id.""" + + protocol: Protocol + host: str + port: int + peer_id: PeerId | None + + def __str__(self): + if self.peer_id is None: + return f'{self.protocol.value}://{self.host}:{self.port}' + else: + return f'{self.protocol.value}://{self.host}:{self.port}/?id={self.peer_id}' + + @classmethod + def parse(cls, description: str) -> Self: + """Parse endpoint description into an Entrypoint object. + + Examples: + + >>> str(Entrypoint.parse('tcp://127.0.0.1:40403/')) + 'tcp://127.0.0.1:40403' + + >>> id1 = 'c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696' + >>> Entrypoint.parse(f'tcp://127.0.0.1:40403/?id={id1}') + Entrypoint(protocol=, host='127.0.0.1', port=40403, \ +peer_id=PeerId('c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696')) + + >>> str(Entrypoint.parse(f'tcp://127.0.0.1:40403/?id={id1}')) + 'tcp://127.0.0.1:40403/?id=c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696' + + >>> Entrypoint.parse('tcp://127.0.0.1:40403') + Entrypoint(protocol=, host='127.0.0.1', port=40403, peer_id=None) + + >>> Entrypoint.parse('tcp://127.0.0.1:40403/') + Entrypoint(protocol=, host='127.0.0.1', port=40403, peer_id=None) + + >>> Entrypoint.parse('tcp://foo.bar.baz:40403/') + Entrypoint(protocol=, host='foo.bar.baz', port=40403, peer_id=None) + + >>> str(Entrypoint.parse('tcp://foo.bar.baz:40403/')) + 'tcp://foo.bar.baz:40403' + + >>> Entrypoint.parse('tcp://127.0.0.1:40403/?id=123') + Traceback (most recent call last): + ... + ValueError: non-hexadecimal number found in fromhex() arg at position 3 + + >>> Entrypoint.parse('tcp://127.0.0.1:4040f') + Traceback (most recent call last): + ... + ValueError: Port could not be cast to integer value as '4040f' + + >>> Entrypoint.parse('udp://127.0.0.1:40403/') + Traceback (most recent call last): + ... + ValueError: 'udp' is not a valid Protocol + + >>> Entrypoint.parse('tcp://127.0.0.1/') + Traceback (most recent call last): + ... + ValueError: expected a port + + >>> Entrypoint.parse('tcp://:40403/') + Traceback (most recent call last): + ... + ValueError: expected a host + + >>> Entrypoint.parse('tcp://127.0.0.1:40403/foo') + Traceback (most recent call last): + ... + ValueError: unexpected path: /foo + + >>> id2 = 'bc5119d47bb4ea7c19100bd97fb11f36970482108bd3d45ff101ee4f6bbec872' + >>> Entrypoint.parse(f'tcp://127.0.0.1:40403/?id={id1}&id={id2}') + Traceback (most recent call last): + ... + ValueError: unexpected id count: 2 + """ + url = urlparse(description) + protocol = Protocol(url.scheme) + host = url.hostname + if host is None: + raise ValueError('expected a host') + port = url.port + if port is None: + raise ValueError('expected a port') + if url.path not in {'', '/'}: + raise ValueError(f'unexpected path: {url.path}') + peer_id: PeerId | None = None + + if url.query: + query = parse_qs(url.query) + if 'id' in query: + ids = query['id'] + if len(ids) != 1: + raise ValueError(f'unexpected id count: {len(ids)}') + peer_id = PeerId(ids[0]) + + return cls(protocol, host, port, peer_id) + + @classmethod + def from_hostname_address(cls, hostname: str, address: IPv4Address | IPv6Address) -> Self: + return cls.parse(f'{address.type}://{hostname}:{address.port}') + + def to_client_endpoint(self, reactor: Reactor) -> IStreamClientEndpoint: + """This method generates a twisted client endpoint that has a .connect() method.""" + # XXX: currently we don't support IPv6, but when we do we have to decide between TCP4ClientEndpoint and + # TCP6ClientEndpoint, when the host is an IP address that is easy, but when it is a DNS hostname, we will not + # know which to use until we know which resource records it holds (A or AAAA) + return TCP4ClientEndpoint(reactor, self.host, self.port) + + def equals_ignore_peer_id(self, other: Self) -> bool: + """Compares `self` and `other` ignoring the `peer_id` fields of either. + + Examples: + + >>> ep1 = 'tcp://foo:111' + >>> ep2 = 'tcp://foo:111/?id=c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696' + >>> ep3 = 'tcp://foo:111/?id=bc5119d47bb4ea7c19100bd97fb11f36970482108bd3d45ff101ee4f6bbec872' + >>> ep4 = 'tcp://bar:111/?id=c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696' + >>> ep5 = 'tcp://foo:112/?id=c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696' + >>> Entrypoint.parse(ep1).equals_ignore_peer_id(Entrypoint.parse(ep2)) + True + >>> Entrypoint.parse(ep2).equals_ignore_peer_id(Entrypoint.parse(ep3)) + True + >>> Entrypoint.parse(ep1).equals_ignore_peer_id(Entrypoint.parse(ep4)) + False + >>> Entrypoint.parse(ep2).equals_ignore_peer_id(Entrypoint.parse(ep4)) + False + >>> Entrypoint.parse(ep2).equals_ignore_peer_id(Entrypoint.parse(ep5)) + False + """ + return (self.protocol, self.host, self.port) == (other.protocol, other.host, other.port) + + def peer_id_conflicts_with(self, other: Self) -> bool: + """Returns True if both self and other have a peer_id and they are different, returns False otherwise. + + This method ignores the host. Which is useful for catching the cases where both `self` and `other` have a + declared `peer_id` and they are not equal. + + >>> desc_no_pid = 'tcp://127.0.0.1:40403/' + >>> ep_no_pid = Entrypoint.parse(desc_no_pid) + >>> desc_pid1 = 'tcp://127.0.0.1:40403/?id=c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696' + >>> ep_pid1 = Entrypoint.parse(desc_pid1) + >>> desc_pid2 = 'tcp://127.0.0.1:40403/?id=bc5119d47bb4ea7c19100bd97fb11f36970482108bd3d45ff101ee4f6bbec872' + >>> ep_pid2 = Entrypoint.parse(desc_pid2) + >>> desc2_pid2 = 'tcp://foo.bar:40403/?id=bc5119d47bb4ea7c19100bd97fb11f36970482108bd3d45ff101ee4f6bbec872' + >>> ep2_pid2 = Entrypoint.parse(desc2_pid2) + >>> ep_no_pid.peer_id_conflicts_with(ep_no_pid) + False + >>> ep_no_pid.peer_id_conflicts_with(ep_pid1) + False + >>> ep_pid1.peer_id_conflicts_with(ep_no_pid) + False + >>> ep_pid1.peer_id_conflicts_with(ep_pid2) + True + >>> ep_pid1.peer_id_conflicts_with(ep2_pid2) + True + >>> ep_pid2.peer_id_conflicts_with(ep2_pid2) + False + """ + return self.peer_id is not None and other.peer_id is not None and self.peer_id != other.peer_id + + def is_localhost(self) -> bool: + """Used to determine if the entrypoint host is a localhost address. + + Examples: + + >>> Entrypoint.parse('tcp://127.0.0.1:444').is_localhost() + True + >>> Entrypoint.parse('tcp://localhost:444').is_localhost() + True + >>> Entrypoint.parse('tcp://8.8.8.8:444').is_localhost() + False + >>> Entrypoint.parse('tcp://foo.bar:444').is_localhost() + False + """ + if self.host == '127.0.0.1': + return True + if self.host == 'localhost': + return True + return False diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index d7e7f422b..91e390ee0 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -25,6 +25,7 @@ from twisted.web.client import Agent from hathor.conf.get_settings import get_global_settings +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.netfilter.factory import NetfilterFactory from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_id import PeerId @@ -34,7 +35,7 @@ from hathor.p2p.states.ready import ReadyState from hathor.p2p.sync_factory import SyncAgentFactory from hathor.p2p.sync_version import SyncVersion -from hathor.p2p.utils import description_to_connection_string, parse_whitelist +from hathor.p2p.utils import parse_whitelist from hathor.pubsub import HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction @@ -59,7 +60,7 @@ class _SyncRotateInfo(NamedTuple): class _ConnectingPeer(NamedTuple): - connection_string: str + entrypoint: Entrypoint endpoint_deferred: Deferred @@ -360,8 +361,8 @@ def disconnect_all_peers(self, *, force: bool = False) -> None: def on_connection_failure(self, failure: Failure, peer: Optional[PeerId], endpoint: IStreamClientEndpoint) -> None: connecting_peer = self.connecting_peers[endpoint] - connection_string = connecting_peer.connection_string - self.log.warn('connection failure', endpoint=connection_string, failure=failure.getErrorMessage()) + entrypoint = connecting_peer.entrypoint + self.log.warn('connection failure', entrypoint=entrypoint, failure=failure.getErrorMessage()) self.connecting_peers.pop(endpoint) self.pubsub.publish( @@ -467,13 +468,13 @@ def iter_ready_connections(self) -> Iterable[HathorProtocol]: for conn in self.connected_peers.values(): yield conn - def iter_not_ready_endpoints(self) -> Iterable[str]: + def iter_not_ready_endpoints(self) -> Iterable[Entrypoint]: """Iterate over not-ready connections.""" for connecting_peer in self.connecting_peers.values(): - yield connecting_peer.connection_string + yield connecting_peer.entrypoint for protocol in self.handshaking_peers: - if protocol.connection_string is not None: - yield protocol.connection_string + if protocol.entrypoint is not None: + yield protocol.entrypoint else: self.log.warn('handshaking protocol has empty connection string', protocol=protocol) @@ -583,38 +584,50 @@ def connect_to_if_not_connected(self, peer: PeerId, now: int) -> None: if peer.can_retry(now): self.connect_to(self.rng.choice(peer.entrypoints), peer) - def _connect_to_callback(self, protocol: Union[HathorProtocol, TLSMemoryBIOProtocol], peer: Optional[PeerId], - endpoint: IStreamClientEndpoint, connection_string: str, - url_peer_id: Optional[str]) -> None: + def _connect_to_callback( + self, + protocol: Union[HathorProtocol, TLSMemoryBIOProtocol], + peer: Optional[PeerId], + endpoint: IStreamClientEndpoint, + entrypoint: Entrypoint, + ) -> None: """Called when we successfully connect to a peer.""" if isinstance(protocol, HathorProtocol): - protocol.on_outbound_connect(url_peer_id, connection_string) + protocol.on_outbound_connect(entrypoint) else: assert isinstance(protocol.wrappedProtocol, HathorProtocol) - protocol.wrappedProtocol.on_outbound_connect(url_peer_id, connection_string) + protocol.wrappedProtocol.on_outbound_connect(entrypoint) self.connecting_peers.pop(endpoint) - def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: Optional[bool] = None) -> None: + def connect_to( + self, + entrypoint: Entrypoint, + peer: Optional[PeerId] = None, + use_ssl: Optional[bool] = None, + ) -> None: """ Attempt to connect to a peer, even if a connection already exists. Usually you should call `connect_to_if_not_connected`. If `use_ssl` is True, then the connection will be wraped by a TLS. """ + if entrypoint.peer_id is not None and peer is not None and str(entrypoint.peer_id) != peer.id: + self.log.debug('skipping because the entrypoint peer_id does not match the actual peer_id', + entrypoint=entrypoint) + return + for connecting_peer in self.connecting_peers.values(): - if connecting_peer.connection_string == description: - self.log.debug('skipping because we are already connecting to this endpoint', endpoint=description) + if connecting_peer.entrypoint.equals_ignore_peer_id(entrypoint): + self.log.debug('skipping because we are already connecting to this endpoint', entrypoint=entrypoint) return + if self.localhost_only and not entrypoint.is_localhost(): + self.log.debug('skip because of simple localhost check', entrypoint=entrypoint) + return + if use_ssl is None: use_ssl = self.use_ssl - connection_string, peer_id = description_to_connection_string(description) - # When using twisted endpoints we can't have // in the connection string - endpoint_url = connection_string.replace('//', '') - endpoint = endpoints.clientFromString(self.reactor, endpoint_url) - if self.localhost_only: - if ('127.0.0.1' not in endpoint_url) and ('localhost' not in endpoint_url): - return + endpoint = entrypoint.to_client_endpoint(self.reactor) factory: IProtocolFactory if use_ssl: @@ -628,11 +641,11 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O peer.increment_retry_attempt(now) deferred = endpoint.connect(factory) - self.connecting_peers[endpoint] = _ConnectingPeer(connection_string, deferred) + self.connecting_peers[endpoint] = _ConnectingPeer(entrypoint, deferred) - deferred.addCallback(self._connect_to_callback, peer, endpoint, connection_string, peer_id) + deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint) deferred.addErrback(self.on_connection_failure, peer, endpoint) - self.log.info('connect to ', endpoint=description, peer=str(peer)) + self.log.info('connect to', entrypoint=entrypoint, peer=str(peer)) self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTING, peer=peer, @@ -689,19 +702,14 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname: assert self.manager is not None for address in self._listen_addresses: if old_hostname is not None: - old_address_str = self._get_hostname_address_str(old_hostname, address) - if old_address_str in self.my_peer.entrypoints: - self.my_peer.entrypoints.remove(old_address_str) - + old_entrypoint = Entrypoint.from_hostname_address(old_hostname, address) + if old_entrypoint in self.my_peer.entrypoints: + self.my_peer.entrypoints.remove(old_entrypoint) self._add_hostname_entrypoint(new_hostname, address) def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None: - hostname_address_str = self._get_hostname_address_str(hostname, address) - self.my_peer.entrypoints.append(hostname_address_str) - - @staticmethod - def _get_hostname_address_str(hostname: str, address: IPv4Address | IPv6Address) -> str: - return '{}://{}:{}'.format(address.type, hostname, address.port).lower() + hostname_entrypoint = Entrypoint.from_hostname_address(hostname, address) + self.my_peer.entrypoints.append(hostname_entrypoint) def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: """ When there are duplicate connections, determine which one should be dropped. diff --git a/hathor/p2p/peer_discovery/bootstrap.py b/hathor/p2p/peer_discovery/bootstrap.py index cdac2bd2a..a30970ae2 100644 --- a/hathor/p2p/peer_discovery/bootstrap.py +++ b/hathor/p2p/peer_discovery/bootstrap.py @@ -17,6 +17,8 @@ from structlog import get_logger from typing_extensions import override +from hathor.p2p.entrypoint import Entrypoint + from .peer_discovery import PeerDiscovery logger = get_logger() @@ -26,15 +28,15 @@ class BootstrapPeerDiscovery(PeerDiscovery): """ It implements a bootstrap peer discovery, which receives a static list of peers. """ - def __init__(self, descriptions: list[str]): + def __init__(self, entrypoints: list[Entrypoint]): """ :param descriptions: Descriptions of peers to connect to. """ super().__init__() self.log = logger.new() - self.descriptions = descriptions + self.entrypoints = entrypoints @override - async def discover_and_connect(self, connect_to: Callable[[str], None]) -> None: - for description in self.descriptions: - connect_to(description) + async def discover_and_connect(self, connect_to: Callable[[Entrypoint], None]) -> None: + for entrypoint in self.entrypoints: + connect_to(entrypoint) diff --git a/hathor/p2p/peer_discovery/dns.py b/hathor/p2p/peer_discovery/dns.py index 4942af963..3886996a3 100644 --- a/hathor/p2p/peer_discovery/dns.py +++ b/hathor/p2p/peer_discovery/dns.py @@ -13,6 +13,7 @@ # limitations under the License. import socket +from collections.abc import Iterator from typing import Callable from structlog import get_logger @@ -21,6 +22,8 @@ from twisted.names.dns import Record_A, Record_TXT, RRHeader from typing_extensions import override +from hathor.p2p.entrypoint import Entrypoint, Protocol + from .peer_discovery import PeerDiscovery logger = get_logger() @@ -41,21 +44,20 @@ def __init__(self, hosts: list[str], default_port: int = 40403, test_mode: int = self.test_mode = test_mode @override - async def discover_and_connect(self, connect_to: Callable[[str], None]) -> None: + async def discover_and_connect(self, connect_to: Callable[[Entrypoint], None]) -> None: """ Run DNS lookup for host and connect to it This is executed when starting the DNS Peer Discovery and first connecting to the network """ for host in self.hosts: - url_list = await self.dns_seed_lookup(host) - for url in url_list: - connect_to(url) + for entrypoint in (await self.dns_seed_lookup(host)): + connect_to(entrypoint) - async def dns_seed_lookup(self, host: str) -> list[str]: + async def dns_seed_lookup(self, host: str) -> set[Entrypoint]: """ Run a DNS lookup for TXT, A, and AAAA records and return a list of connection strings. """ if self.test_mode: # Useful for testing purposes, so we don't need to execute a DNS query - return ['tcp://127.0.0.1:40403'] + return {Entrypoint.parse('tcp://127.0.0.1:40403')} d1 = lookupText(host) d1.addCallback(self.dns_seed_lookup_text) @@ -65,12 +67,7 @@ async def dns_seed_lookup(self, host: str) -> list[str]: d2.addCallback(self.dns_seed_lookup_address) d2.addErrback(self.errback), - d = defer.gatherResults([d1, d2]) - results = await d - unique_urls: set[str] = set() - for urls in results: - unique_urls.update(urls) - return list(unique_urls) + return set(await defer.gatherResults([d1, d2])) def errback(self, result): """ Return an empty list if any error occur. @@ -80,36 +77,37 @@ def errback(self, result): def dns_seed_lookup_text( self, results: tuple[list[RRHeader], list[RRHeader], list[RRHeader]] - ) -> list[str]: + ) -> Iterator[Entrypoint]: """ Run a DNS lookup for TXT records to discover new peers. The `results` has three lists that contain answer records, authority records, and additional records. """ answers, _, _ = results - ret: list[str] = [] for record in answers: assert isinstance(record.payload, Record_TXT) for txt in record.payload.data: - txt = txt.decode('utf-8') - self.log.info('seed DNS TXT found', endpoint=txt) - ret.append(txt) - return ret + raw_entrypoint = txt.decode('utf-8') + try: + entrypoint = Entrypoint.parse(raw_entrypoint) + except ValueError: + self.log.warning('could not parse entrypoint, skipping it', raw_entrypoint=raw_entrypoint) + continue + self.log.info('seed DNS TXT found', entrypoint=entrypoint) + yield entrypoint def dns_seed_lookup_address( self, results: tuple[list[RRHeader], list[RRHeader], list[RRHeader]] - ) -> list[str]: + ) -> Iterator[Entrypoint]: """ Run a DNS lookup for A records to discover new peers. The `results` has three lists that contain answer records, authority records, and additional records. """ answers, _, _ = results - ret: list[str] = [] for record in answers: assert isinstance(record.payload, Record_A) address = record.payload.address assert address is not None host = socket.inet_ntoa(address) - txt = 'tcp://{}:{}'.format(host, self.default_port) - self.log.info('seed DNS A found', endpoint=txt) - ret.append(txt) - return ret + entrypoint = Entrypoint(Protocol.TCP, host, self.default_port) + self.log.info('seed DNS A found', entrypoint=entrypoint) + yield entrypoint diff --git a/hathor/p2p/peer_discovery/peer_discovery.py b/hathor/p2p/peer_discovery/peer_discovery.py index 1dbf2bf8f..a6ff799ed 100644 --- a/hathor/p2p/peer_discovery/peer_discovery.py +++ b/hathor/p2p/peer_discovery/peer_discovery.py @@ -15,13 +15,15 @@ from abc import ABC, abstractmethod from typing import Callable +from hathor.p2p.entrypoint import Entrypoint + class PeerDiscovery(ABC): """ Base class to implement peer discovery strategies. """ @abstractmethod - async def discover_and_connect(self, connect_to: Callable[[str], None]) -> None: + async def discover_and_connect(self, connect_to: Callable[[Entrypoint], None]) -> None: """ This method must discover the peers and call `connect_to` for each of them. :param connect_to: Function which will be called for each discovered peer. diff --git a/hathor/p2p/peer_id.py b/hathor/p2p/peer_id.py index 1ab1ae58e..567ae7737 100644 --- a/hathor/p2p/peer_id.py +++ b/hathor/p2p/peer_id.py @@ -31,7 +31,8 @@ from hathor.conf.get_settings import get_global_settings from hathor.daa import DifficultyAdjustmentAlgorithm -from hathor.p2p.utils import connection_string_to_host, discover_dns, generate_certificate +from hathor.p2p.entrypoint import Entrypoint +from hathor.p2p.utils import discover_dns, generate_certificate from hathor.util import not_none if TYPE_CHECKING: @@ -59,7 +60,7 @@ class PeerId: """ id: Optional[str] - entrypoints: list[str] + entrypoints: list[Entrypoint] private_key: Optional[rsa.RSAPrivateKeyWithSerialization] public_key: Optional[rsa.RSAPublicKey] certificate: Optional[x509.Certificate] @@ -200,7 +201,11 @@ def create_from_json(cls, data: dict[str, Any]) -> 'PeerId': obj.private_key = private_key if 'entrypoints' in data: - obj.entrypoints = data['entrypoints'] + for entrypoint_string in data['entrypoints']: + entrypoint = Entrypoint.parse(entrypoint_string) + if entrypoint.peer_id is not None: + raise ValueError('do not add id= to peer.json entrypoints') + obj.entrypoints.append(entrypoint) # TODO(epnichols): call obj.validate()? return obj @@ -243,7 +248,7 @@ def to_json(self, include_private_key: bool = False) -> dict[str, Any]: result = { 'id': self.id, 'pubKey': base64.b64encode(public_der).decode('utf-8'), - 'entrypoints': self.entrypoints, + 'entrypoints': [str(ep) for ep in self.entrypoints], } if include_private_key: assert self.private_key is not None @@ -352,23 +357,24 @@ async def validate_entrypoint(self, protocol: 'HathorProtocol') -> bool: # Entrypoint validation with connection string and connection host # Entrypoints have the format tcp://IP|name:port for entrypoint in self.entrypoints: - if protocol.connection_string: + if protocol.entrypoint is not None: # Connection string has the format tcp://IP:port # So we must consider that the entrypoint could be in name format - if protocol.connection_string == entrypoint: + if protocol.entrypoint.equals_ignore_peer_id(entrypoint): + # XXX: wrong peer-id should not make it into self.entrypoints + assert not protocol.entrypoint.peer_id_conflicts_with(entrypoint), 'wrong peer-id was added before' # Found the entrypoint found_entrypoint = True break - host = connection_string_to_host(entrypoint) # TODO: don't use `daa.TEST_MODE` for this test_mode = not_none(DifficultyAdjustmentAlgorithm.singleton).TEST_MODE - result = await discover_dns(host, test_mode) - if protocol.connection_string in result: + result = await discover_dns(entrypoint.host, test_mode) + if protocol.entrypoint in result: # Found the entrypoint found_entrypoint = True break else: - # When the peer is the server part of the connection we don't have the full connection_string + # When the peer is the server part of the connection we don't have the full entrypoint description # So we can only validate the host from the protocol assert protocol.transport is not None connection_remote = protocol.transport.getPeer() @@ -377,13 +383,12 @@ async def validate_entrypoint(self, protocol: 'HathorProtocol') -> bool: continue # Connection host has only the IP # So we must consider that the entrypoint could be in name format and we just validate the host - host = connection_string_to_host(entrypoint) - if connection_host == host: + if connection_host == entrypoint.host: found_entrypoint = True break test_mode = not_none(DifficultyAdjustmentAlgorithm.singleton).TEST_MODE - result = await discover_dns(host, test_mode) - if connection_host in [connection_string_to_host(x) for x in result]: + result = await discover_dns(entrypoint.host, test_mode) + if connection_host in [entrypoint.host for entrypoint in result]: # Found the entrypoint found_entrypoint = True break diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index e4cff0d5a..dfaf1a46b 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -24,6 +24,7 @@ from twisted.python.failure import Failure from hathor.conf.get_settings import get_global_settings +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer_id import PeerId from hathor.p2p.rate_limiter import RateLimiter @@ -82,8 +83,7 @@ class WarningFlags(str, Enum): state: Optional[BaseState] connection_time: float _state_instances: dict[PeerState, BaseState] - connection_string: Optional[str] - expected_peer_id: Optional[str] + entrypoint: Optional[Entrypoint] warning_flags: set[str] aborting: bool diff_timestamp: Optional[int] @@ -138,7 +138,7 @@ def __init__(self, network: str, my_peer: PeerId, p2p_manager: 'ConnectionsManag # Connection string of the peer # Used to validate if entrypoints has this string - self.connection_string: Optional[str] = None + self.entrypoint: Optional[Entrypoint] = None # Peer id sent in the connection url that is expected to connect (optional) self.expected_peer_id: Optional[str] = None @@ -243,17 +243,10 @@ def on_connect(self) -> None: if self.connections: self.connections.on_peer_connect(self) - def on_outbound_connect(self, url_peer_id: Optional[str], connection_string: str) -> None: + def on_outbound_connect(self, entrypoint: Entrypoint) -> None: """Called when we successfully establish an outbound connection to a peer.""" - if url_peer_id: - # Set in protocol the peer id extracted from the URL that must be validated - self.expected_peer_id = url_peer_id - else: - # Add warning flag - self.warning_flags.add(self.WarningFlags.NO_PEER_ID_URL) - - # Setting connection string in protocol, so we can validate it matches the entrypoints data - self.connection_string = connection_string + # Save the used entrypoint in protocol so we can validate that it matches the entrypoints data + self.entrypoint = entrypoint def on_peer_ready(self) -> None: assert self.connections is not None diff --git a/hathor/p2p/resources/add_peers.py b/hathor/p2p/resources/add_peers.py index 85d7d7960..75a39b901 100644 --- a/hathor/p2p/resources/add_peers.py +++ b/hathor/p2p/resources/add_peers.py @@ -20,6 +20,7 @@ from hathor.api_util import Resource, render_options, set_cors from hathor.cli.openapi_files.register import register_resource from hathor.manager import HathorManager +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.peer_discovery import BootstrapPeerDiscovery from hathor.util import json_dumpb, json_loadb @@ -48,41 +49,46 @@ def render_POST(self, request: Request) -> bytes: return json_dumpb({'success': False, 'message': 'No post data'}) try: - peers = json_loadb(raw_data) + raw_entrypoints = json_loadb(raw_data) except (JSONDecodeError, AttributeError): return json_dumpb({'success': False, 'message': 'Invalid format for post data'}) - if not isinstance(peers, list): + if not isinstance(raw_entrypoints, list): return json_dumpb({ 'success': False, 'message': 'Invalid format for post data. It was expected a list of strings.' }) - known_peers = self.manager.connections.peer_storage.values() + try: + entrypoints = list(map(Entrypoint.parse, raw_entrypoints)) + except ValueError: + return json_dumpb({ + 'success': False, + 'message': 'Malformed entrypoint found.' + }) - def already_connected(connection_string: str) -> bool: - # determines if given connection string is already among connected or connecting peers - endpoint_url = connection_string.replace('//', '') + known_peers = self.manager.connections.peer_storage.values() + def already_connected(entrypoint: Entrypoint) -> bool: # ignore peers that we're already trying to connect - if endpoint_url in self.manager.connections.iter_not_ready_endpoints(): + if entrypoint in self.manager.connections.iter_not_ready_endpoints(): return True # remove peers we already know about for peer in known_peers: - if connection_string in peer.entrypoints: + if entrypoint in peer.entrypoints: return True return False - filtered_peers = [connection_string for connection_string in peers if not already_connected(connection_string)] + filtered_peers = [entrypoint for entrypoint in entrypoints if not already_connected(entrypoint)] pd = BootstrapPeerDiscovery(filtered_peers) # this fires and forget the coroutine, which is compatible with the original behavior coro = pd.discover_and_connect(self.manager.connections.connect_to) Deferred.fromCoroutine(coro) - ret = {'success': True, 'peers': filtered_peers} + ret = {'success': True, 'peers': [str(p) for p in filtered_peers]} return json_dumpb(ret) def render_OPTIONS(self, request: Request) -> int: diff --git a/hathor/p2p/utils.py b/hathor/p2p/utils.py index 4e2935a2e..5683224f9 100644 --- a/hathor/p2p/utils.py +++ b/hathor/p2p/utils.py @@ -15,7 +15,6 @@ import datetime import re from typing import Any, Optional -from urllib.parse import parse_qs, urlparse import requests from cryptography import x509 @@ -29,6 +28,7 @@ from hathor.conf.get_settings import get_global_settings from hathor.indexes.height_index import HeightInfo +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.peer_discovery import DNSPeerDiscovery from hathor.transaction.genesis import get_representation_for_all_genesis @@ -52,25 +52,6 @@ def discover_ip_ipify(timeout: float | None = None) -> Optional[str]: return None -def description_to_connection_string(description: str) -> tuple[str, Optional[str]]: - """ The description returned from DNS query may contain a peer-id parameter - This method splits this description into the connection URL and the peer-id (in case it exists) - Expected description is something like: tcp://127.0.0.1:40403/?id=123 - The expected returned tuple in this case would be ('tcp://127.0.0.1:40403', '123') - """ - result = urlparse(description) - - url = "{}://{}".format(result.scheme, result.netloc) - peer_id = None - - if result.query: - query_result = parse_qs(result.query) - if 'id' in query_result: - peer_id = query_result['id'][0] - - return url, peer_id - - def get_genesis_short_hash() -> str: """ Return the first 7 chars of the GENESIS_HASH used for validation that the genesis are the same """ @@ -92,14 +73,7 @@ def get_settings_hello_dict() -> dict[str, Any]: return settings_dict -def connection_string_to_host(connection_string: str) -> str: - """ From a connection string I return the host - tcp://127.0.0.1:40403 -> 127.0.0.1 - """ - return urlparse(connection_string).netloc.split(':')[0] - - -async def discover_dns(host: str, test_mode: int = 0) -> list[str]: +async def discover_dns(host: str, test_mode: int = 0) -> list[Entrypoint]: """ Start a DNS peer discovery object and execute a search for the host Returns the DNS string from the requested host @@ -107,7 +81,7 @@ async def discover_dns(host: str, test_mode: int = 0) -> list[str]: """ discovery = DNSPeerDiscovery([], test_mode=test_mode) result = await discovery.dns_seed_lookup(host) - return result + return list(result) def generate_certificate(private_key: RSAPrivateKey, ca_file: str, ca_pkey_file: str) -> Certificate: diff --git a/hathor/types.py b/hathor/types.py index 40ad6dead..7dfa808aa 100644 --- a/hathor/types.py +++ b/hathor/types.py @@ -24,3 +24,87 @@ Timestamp: TypeAlias = int # NewType('Timestamp', int) TokenUid: TypeAlias = VertexId # NewType('TokenUid', VertexId) Amount: TypeAlias = int # NewType('Amount', int) + + +class Hash: + r""" A type for easily representing a 32-byte hash, it is not meant to be used directly. + + Instead it is meant to be used to make new-types that also happen to be a hash. This class will provide convenient + methods for parsing and representing it. + + Examples: + + >>> x = Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc') + >>> bytes(x) + b'\x00\x00\x06\xcb\x938[\x8b\x87\xa5E\xa1\xcb\xb6\x19~l\xaf\xf6\x00\xc1,\xc1/\xc5BP\xd3\x9c\x80\x88\xfc' + + >>> Hash(b'\x00\x00\x06\xcb\x938[\x8b\x87\xa5E\xa1\xcb\xb6\x19~l\xaf\xf6\x00\xc1,\xc1/\xc5BP\xd3\x9c\x80\x88\xfc') + Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc') + + >>> str(x) + '000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc' + + >>> repr(x) + "Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc')" + + >>> {x} + {Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc')} + + >>> class Foo(Hash): + ... pass + >>> y = Foo('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc') + >>> repr(y) + "Foo('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc')" + + >>> x == y + True + + >>> {x: 123}[y] + 123 + + >>> Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088fc34') + Traceback (most recent call last): + ... + ValueError: expected 32 bytes, got 33 bytes + + >>> Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088') + Traceback (most recent call last): + ... + ValueError: expected 32 bytes, got 31 bytes + + >>> Hash('000006cb93385b8b87a545a1cbb6197e6caff600c12cc12fc54250d39c8088f') + Traceback (most recent call last): + ... + ValueError: non-hexadecimal number found in fromhex() arg at position 63 + + >>> Hash(123) + Traceback (most recent call last): + ... + TypeError: expected a bytes or str instance, got a instead + """ + __slots__ = ('_inner',) + _inner: bytes + + def __init__(self, inner: bytes | str) -> None: + if isinstance(inner, str): + inner = bytes.fromhex(inner) + if not isinstance(inner, bytes): + raise TypeError(f'expected a bytes or str instance, got a {repr(type(inner))} instead') + if len(inner) != 32: + raise ValueError(f'expected 32 bytes, got {len(inner)} bytes') + self._inner = inner + + def __bytes__(self): + return self._inner + + def __str__(self): + return self._inner.hex() + + def __repr__(self): + return f"{type(self).__name__}('{self}')" + + def __hash__(self): + return hash(self._inner) + + def __eq__(self, other): + return self._inner.__eq__(other._inner) diff --git a/tests/others/test_metrics.py b/tests/others/test_metrics.py index 24ef212cc..ded64ad6c 100644 --- a/tests/others/test_metrics.py +++ b/tests/others/test_metrics.py @@ -3,6 +3,7 @@ import pytest +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.manager import PeerConnectionsMetrics from hathor.p2p.peer_id import PeerId from hathor.p2p.protocol import HathorProtocol @@ -65,7 +66,7 @@ def test_connections_manager_integration(self): manager.connections.handshaking_peers.update({Mock()}) # Execution - endpoint = 'tcp://127.0.0.1:8005' + endpoint = Entrypoint.parse('tcp://127.0.0.1:8005') # This will trigger sending to the pubsub one of the network events manager.connections.connect_to(endpoint, use_ssl=True) diff --git a/tests/p2p/test_connections.py b/tests/p2p/test_connections.py index c75abea7e..a9e33b79f 100644 --- a/tests/p2p/test_connections.py +++ b/tests/p2p/test_connections.py @@ -2,6 +2,7 @@ import pytest +from hathor.p2p.entrypoint import Entrypoint from tests import unittest from tests.utils import run_server @@ -20,7 +21,7 @@ def test_connections(self) -> None: def test_manager_connections(self) -> None: manager = self.create_peer('testnet', enable_sync_v1=True, enable_sync_v2=False) - endpoint = 'tcp://127.0.0.1:8005' + endpoint = Entrypoint.parse('tcp://127.0.0.1:8005') manager.connections.connect_to(endpoint, use_ssl=True) self.assertIn(endpoint, manager.connections.iter_not_ready_endpoints()) diff --git a/tests/p2p/test_peer_id.py b/tests/p2p/test_peer_id.py index bccb9bcb2..aec32921a 100644 --- a/tests/p2p/test_peer_id.py +++ b/tests/p2p/test_peer_id.py @@ -6,6 +6,7 @@ from twisted.internet.interfaces import ITransport +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.peer_id import InvalidPeerIdException, PeerId from hathor.p2p.peer_storage import PeerStorage from hathor.util import not_none @@ -90,17 +91,21 @@ def test_merge_peer(self) -> None: self.assertEqual(peer.public_key, p1.public_key) self.assertEqual(peer.entrypoints, []) + ep1 = Entrypoint.parse('tcp://127.0.0.1:1001') + ep2 = Entrypoint.parse('tcp://127.0.0.1:1002') + ep3 = Entrypoint.parse('tcp://127.0.0.1:1003') + p3 = PeerId() - p3.entrypoints.append('1') - p3.entrypoints.append('3') + p3.entrypoints.append(ep1) + p3.entrypoints.append(ep2) p3.public_key = None p4 = PeerId() p4.public_key = None p4.private_key = None p4.id = p3.id - p4.entrypoints.append('2') - p4.entrypoints.append('3') + p4.entrypoints.append(ep2) + p4.entrypoints.append(ep3) peer_storage.add_or_merge(p4) self.assertEqual(len(peer_storage), 2) @@ -111,7 +116,7 @@ def test_merge_peer(self) -> None: peer = peer_storage[not_none(p3.id)] self.assertEqual(peer.id, p3.id) self.assertEqual(peer.private_key, p3.private_key) - self.assertEqual(peer.entrypoints, ['2', '3', '1']) + self.assertEqual(set(peer.entrypoints), {ep1, ep2, ep3}) with self.assertRaises(ValueError): peer_storage.add(p1) @@ -216,25 +221,25 @@ class BasePeerIdTest(unittest.TestCase): async def test_validate_entrypoint(self) -> None: manager = self.create_peer('testnet', unlock_wallet=False) peer_id = manager.my_peer - peer_id.entrypoints = ['tcp://127.0.0.1:40403'] + peer_id.entrypoints = [Entrypoint.parse('tcp://127.0.0.1:40403')] # we consider that we are starting the connection to the peer protocol = manager.connections.client_factory.buildProtocol('127.0.0.1') - protocol.connection_string = 'tcp://127.0.0.1:40403' + protocol.entrypoint = Entrypoint.parse('tcp://127.0.0.1:40403') result = await peer_id.validate_entrypoint(protocol) self.assertTrue(result) # if entrypoint is an URI - peer_id.entrypoints = ['uri_name'] + peer_id.entrypoints = [Entrypoint.parse('tcp://uri_name:40403')] result = await peer_id.validate_entrypoint(protocol) self.assertTrue(result) # test invalid. DNS in test mode will resolve to '127.0.0.1:40403' - protocol.connection_string = 'tcp://45.45.45.45:40403' + protocol.entrypoint = Entrypoint.parse('tcp://45.45.45.45:40403') result = await peer_id.validate_entrypoint(protocol) self.assertFalse(result) # now test when receiving the connection - i.e. the peer starts it - protocol.connection_string = None - peer_id.entrypoints = ['tcp://127.0.0.1:40403'] + protocol.entrypoint = None + peer_id.entrypoints = [Entrypoint.parse('tcp://127.0.0.1:40403')] from collections import namedtuple Peer = namedtuple('Peer', 'host') @@ -246,7 +251,7 @@ def getPeer(self) -> Peer: result = await peer_id.validate_entrypoint(protocol) self.assertTrue(result) # if entrypoint is an URI - peer_id.entrypoints = ['uri_name'] + peer_id.entrypoints = [Entrypoint.parse('tcp://uri_name:40403')] result = await peer_id.validate_entrypoint(protocol) self.assertTrue(result) diff --git a/tests/resources/p2p/test_add_peer.py b/tests/resources/p2p/test_add_peer.py index 4d61fcb3b..c22598b8a 100644 --- a/tests/resources/p2p/test_add_peer.py +++ b/tests/resources/p2p/test_add_peer.py @@ -1,5 +1,6 @@ from twisted.internet.defer import inlineCallbacks +from hathor.p2p.entrypoint import Entrypoint from hathor.p2p.peer_id import PeerId from hathor.p2p.resources import AddPeersResource from tests import unittest @@ -21,7 +22,7 @@ def test_connecting_peers(self): # test when we send a peer we're already connected to peer = PeerId() - peer.entrypoints = ['tcp://localhost:8006'] + peer.entrypoints = [Entrypoint.parse('tcp://localhost:8006')] self.manager.connections.peer_storage.add(peer) response = yield self.web.post('p2p/peers', ['tcp://localhost:8006', 'tcp://localhost:8007']) data = response.json_value()