diff --git a/hathor/metrics.py b/hathor/metrics.py index b53752342..50716527f 100644 --- a/hathor/metrics.py +++ b/hathor/metrics.py @@ -109,8 +109,8 @@ class Metrics: # Variables to store the last block when we updated the RocksDB storage metrics last_txstorage_data_block: Optional[int] = None - # Peers connected - connected_peers: int = 0 + # Peers ready + ready_peers: int = 0 # Peers handshaking handshaking_peers: int = 0 # Peers connecting @@ -200,7 +200,7 @@ def handle_publish(self, key: HathorEvents, args: EventArguments) -> None: ): peers_connection_metrics: PeerConnectionsMetrics = data["peers_count"] - self.connected_peers = peers_connection_metrics.connected_peers_count + self.ready_peers = peers_connection_metrics.ready_peers_count self.connecting_peers = peers_connection_metrics.connecting_peers_count self.handshaking_peers = peers_connection_metrics.handshaking_peers_count self.known_peers = peers_connection_metrics.known_peers_count @@ -247,14 +247,14 @@ def collect_peer_connection_metrics(self) -> None: """ self.peer_connection_metrics.clear() - for connection in self.connections.connections: + for connection in self.connections.get_connected_peers(): if not connection._peer: # A connection without peer will not be able to communicate # So we can just discard it for the sake of the metrics continue metric = PeerConnectionMetrics( - connection_string=str(connection.entrypoint) if connection.entrypoint else "", + connection_string=str(connection.addr), peer_id=str(connection.peer.id), network=settings.NETWORK_NAME, received_messages=connection.metrics.received_messages, diff --git a/hathor/p2p/factory.py b/hathor/p2p/factory.py index 832f2e501..1b449d84f 100644 --- a/hathor/p2p/factory.py +++ b/hathor/p2p/factory.py @@ -20,6 +20,7 @@ from hathor.conf.settings import HathorSettings from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer +from hathor.p2p.peer_endpoint import PeerAddress from hathor.p2p.protocol import HathorLineReceiver @@ -41,15 +42,14 @@ def __init__( self.use_ssl = use_ssl def buildProtocol(self, addr: IAddress) -> HathorLineReceiver: - p = HathorLineReceiver( + return HathorLineReceiver( + addr=PeerAddress.from_address(addr), my_peer=self.my_peer, p2p_manager=self.p2p_manager, use_ssl=self.use_ssl, inbound=self.inbound, - settings=self._settings + settings=self._settings, ) - p.factory = self - return p class HathorServerFactory(_HathorLineReceiverFactory, protocol.ServerFactory): diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index d7e7045c9..d44d21aa0 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -12,13 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from typing import TYPE_CHECKING, Any, Iterable, NamedTuple, Optional from structlog import get_logger from twisted.internet import endpoints from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IListeningPort, IProtocol, IProtocolFactory, IStreamClientEndpoint +from twisted.internet.interfaces import IListeningPort, IProtocol, IProtocolFactory from twisted.internet.task import LoopingCall from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol from twisted.python.failure import Failure @@ -27,6 +29,7 @@ from hathor.conf.settings import HathorSettings from hathor.p2p.netfilter.factory import NetfilterFactory from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer +from hathor.p2p.peer_connections import PeerConnections from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint from hathor.p2p.peer_id import PeerId @@ -59,15 +62,10 @@ class _SyncRotateInfo(NamedTuple): to_enable: set[PeerId] -class _ConnectingPeer(NamedTuple): - entrypoint: PeerEndpoint - endpoint_deferred: Deferred - - class PeerConnectionsMetrics(NamedTuple): connecting_peers_count: int handshaking_peers_count: int - connected_peers_count: int + ready_peers_count: int known_peers_count: int @@ -79,10 +77,6 @@ class GlobalRateLimiter: SEND_TIPS = 'NodeSyncTimestamp.send_tips' manager: Optional['HathorManager'] - connections: set[HathorProtocol] - connected_peers: dict[PeerId, HathorProtocol] - connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer] - handshaking_peers: set[HathorProtocol] whitelist_only: bool unverified_peer_storage: UnverifiedPeerStorage verified_peer_storage: VerifiedPeerStorage @@ -144,17 +138,7 @@ def __init__( self.rate_limiter = RateLimiter(self.reactor) self.enable_rate_limiter() - # All connections. - self.connections = set() - - # List of pending connections. - self.connecting_peers = {} - - # List of peers connected but still not ready to communicate. - self.handshaking_peers = set() - - # List of peers connected and ready to communicate. - self.connected_peers = {} + self._connections = PeerConnections() # List of peers received from the network. # We cannot trust their identity before we connect to them. @@ -328,12 +312,12 @@ def stop(self) -> None: def _get_peers_count(self) -> PeerConnectionsMetrics: """Get a dict containing the count of peers in each state""" - + peer_counts = self._connections.get_peer_counts() return PeerConnectionsMetrics( - len(self.connecting_peers), - len(self.handshaking_peers), - len(self.connected_peers), - len(self.verified_peer_storage) + connecting_peers_count=peer_counts.connecting, + handshaking_peers_count=peer_counts.handshaking, + ready_peers_count=peer_counts.ready, + known_peers_count=len(self.verified_peer_storage) ) def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: @@ -371,31 +355,25 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None: def disconnect_all_peers(self, *, force: bool = False) -> None: """Disconnect all peers.""" - for conn in self.iter_all_connections(): + for conn in self.get_connected_peers(): conn.disconnect(force=force) - def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer | PublicPeer], - endpoint: IStreamClientEndpoint) -> None: - connecting_peer = self.connecting_peers[endpoint] - entrypoint = connecting_peer.entrypoint - self.log.warn('connection failure', entrypoint=str(entrypoint), failure=failure.getErrorMessage()) - self.connecting_peers.pop(endpoint) - + def on_connection_failure(self, failure: Failure, endpoint: PeerEndpoint) -> None: + self.log.warn('connection failure', endpoint=str(endpoint), failure=failure.getErrorMessage()) + self._connections.on_failed_to_connect(addr=endpoint.addr) self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTION_FAILED, - peer=peer, peers_count=self._get_peers_count() ) def on_peer_connect(self, protocol: HathorProtocol) -> None: - """Called when a new connection is established.""" - if len(self.connections) >= self.max_connections: + """Called when a new connection is established from both inbound and outbound peers.""" + if len(self._connections.connected_peers()) >= self.max_connections: self.log.warn('reached maximum number of connections', max_connections=self.max_connections) protocol.disconnect(force=True) return - self.connections.add(protocol) - self.handshaking_peers.add(protocol) + self._connections.on_connected(protocol=protocol) self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTED, protocol=protocol, @@ -404,12 +382,9 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None: def on_peer_ready(self, protocol: HathorProtocol) -> None: """Called when a peer is ready.""" - assert protocol.peer is not None self.verified_peer_storage.add_or_replace(protocol.peer) - assert protocol.peer.id is not None - - self.handshaking_peers.remove(protocol) self.unverified_peer_storage.pop(protocol.peer.id, None) + connection_to_drop = self._connections.on_ready(addr=protocol.addr, peer_id=protocol.peer.id) # we emit the event even if it's a duplicate peer as a matching # NETWORK_PEER_DISCONNECTED will be emitted regardless @@ -419,21 +394,17 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: peers_count=self._get_peers_count() ) - if protocol.peer.id in self.connected_peers: + if connection_to_drop: # connected twice to same peer - self.log.warn('duplicate connection to peer', protocol=protocol) - conn = self.get_connection_to_drop(protocol) - self.reactor.callLater(0, self.drop_connection, conn) - if conn == protocol: - # the new connection is being dropped, so don't save it to connected_peers + self.log.warn('duplicate connection to peer', addr=str(protocol.addr), peer_id=str(protocol.peer.id)) + self.reactor.callLater(0, self.drop_connection, connection_to_drop) + if connection_to_drop == protocol: return - self.connected_peers[protocol.peer.id] = protocol - # In case it was a retry, we must reset the data only here, after it gets ready protocol.peer.info.reset_retry_timestamp() - if len(self.connected_peers) <= self.MAX_ENABLED_SYNC: + if len(self._connections.ready_peers()) <= self.MAX_ENABLED_SYNC: protocol.enable_sync() if protocol.peer.id in self.always_enable_sync: @@ -450,54 +421,55 @@ def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None: assert isinstance(conn.state, ReadyState) conn.state.send_peers([peer]) - def on_peer_disconnect(self, protocol: HathorProtocol) -> None: - """Called when a peer disconnect.""" - self.connections.discard(protocol) - if protocol in self.handshaking_peers: - self.handshaking_peers.remove(protocol) - if protocol._peer is not None: - existing_protocol = self.connected_peers.pop(protocol.peer.id, None) - if existing_protocol is None: - # in this case, the connection was closed before it got to READY state - return - if existing_protocol != protocol: - # this is the case we're closing a duplicate connection. We need to set the - # existing protocol object back to connected_peers, as that connection is still ongoing. - # A check for duplicate connections is done during PEER_ID state, but there's still a - # chance it can happen if both connections start at the same time and none of them has - # reached READY state while the other is on PEER_ID state - self.connected_peers[protocol.peer.id] = existing_protocol + def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: + """Called when a peer disconnects from a handshaking state (HELLO or PEER-ID).""" + self._connections.on_handshake_disconnect(addr=addr) + self.pubsub.publish( + HathorEvents.NETWORK_PEER_DISCONNECTED, + peers_count=self._get_peers_count() + ) + + def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: + """Called when a peer disconnects from the READY state.""" + self._connections.on_ready_disconnect(addr=addr, peer_id=peer_id) self.pubsub.publish( HathorEvents.NETWORK_PEER_DISCONNECTED, - protocol=protocol, peers_count=self._get_peers_count() ) - def iter_all_connections(self) -> Iterable[HathorProtocol]: - """Iterate over all connections.""" - for conn in self.connections: - yield conn + def on_unknown_disconnect(self, *, addr: PeerAddress) -> None: + """Called when a peer disconnects from an unknown state (None).""" + self._connections.on_unknown_disconnect(addr=addr) + self.pubsub.publish( + HathorEvents.NETWORK_PEER_DISCONNECTED, + peers_count=self._get_peers_count() + ) + + def iter_connecting_outbound_peers(self) -> Iterable[PeerAddress]: + yield from self._connections.connecting_outbound_peers() + + def iter_handshaking_peers(self) -> Iterable[HathorProtocol]: + yield from self._connections.handshaking_peers().values() def iter_ready_connections(self) -> Iterable[HathorProtocol]: """Iterate over ready connections.""" - for conn in self.connected_peers.values(): - yield conn + yield from self._connections.ready_peers().values() - def iter_not_ready_endpoints(self) -> Iterable[PeerEndpoint]: + def iter_not_ready_endpoints(self) -> Iterable[PeerAddress]: """Iterate over not-ready connections.""" - for connecting_peer in self.connecting_peers.values(): - yield connecting_peer.entrypoint - for protocol in self.handshaking_peers: - if protocol.entrypoint is not None: - yield protocol.entrypoint - else: - self.log.warn('handshaking protocol has empty connection string', protocol=protocol) + yield from self._connections.not_ready_peers() + + def get_connected_peers(self) -> Iterable[HathorProtocol]: + yield from self._connections.connected_peers().values() - def is_peer_connected(self, peer_id: PeerId) -> bool: + def get_ready_peer_by_id(self, peer_id: PeerId) -> HathorProtocol | None: + return self._connections.get_ready_peer_by_id(peer_id) + + def is_peer_ready(self, peer_id: PeerId) -> bool: """ :type peer_id: string (peer.id) """ - return peer_id in self.connected_peers + return self._connections.is_peer_ready(peer_id) def on_receive_peer(self, peer: UnverifiedPeer, origin: Optional[ReadyState] = None) -> None: """ Update a peer information in our storage, and instantly attempt to connect @@ -514,7 +486,7 @@ def peers_cleanup(self) -> None: to_be_removed: list[PublicPeer] = [] for peer in self.verified_peer_storage.values(): assert peer.id is not None - if self.is_peer_connected(peer.id): + if self.is_peer_ready(peer.id): continue dt = now - peer.info.last_seen if dt > self.max_peer_unseen_dt: @@ -593,10 +565,10 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in # It makes no sense to keep storing peers that have disconnected and have no entrypoints # We will never be able to connect to them anymore and they will only keep spending memory # and other resources when used in APIs, so we are removing them here - if peer.id not in self.connected_peers: + if not self.is_peer_ready(peer.id): self.verified_peer_storage.remove(peer) return - if peer.id in self.connected_peers: + if self.is_peer_ready(peer.id): return assert peer.id is not None @@ -612,90 +584,65 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in self.connect_to(addr.with_id(peer.id), peer) - def _connect_to_callback( - self, - protocol: IProtocol, - peer: UnverifiedPeer | PublicPeer | None, - endpoint: IStreamClientEndpoint, - entrypoint: PeerEndpoint, - ) -> None: - """Called when we successfully connect to a peer.""" - if isinstance(protocol, HathorProtocol): - protocol.on_outbound_connect(entrypoint, peer) - else: - assert isinstance(protocol, TLSMemoryBIOProtocol) - assert isinstance(protocol.wrappedProtocol, HathorProtocol) - protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer) - self.connecting_peers.pop(endpoint) + def _connect_to_callback(self, protocol: IProtocol, addr: PeerAddress, peer_id: PeerId | None) -> None: + """Called when we successfully connect to an outbound peer.""" + if isinstance(protocol, TLSMemoryBIOProtocol): + protocol = protocol.wrappedProtocol + assert isinstance(protocol, HathorProtocol) + assert protocol.addr == addr + protocol.on_outbound_connect(peer_id) def connect_to( self, - entrypoint: PeerEndpoint, + endpoint: PeerEndpoint, peer: UnverifiedPeer | PublicPeer | None = None, - use_ssl: bool | None = None, - ) -> None: - """ Attempt to connect to a peer, even if a connection already exists. - Usually you should call `connect_to_if_not_connected`. + ) -> Deferred[IProtocol] | None: + """Attempt to connect to a peer though a specific endpoint.""" + if endpoint.peer_id is not None and peer is not None: + assert endpoint.peer_id == peer.id, 'the entrypoint peer_id does not match the actual peer_id' - If `use_ssl` is True, then the connection will be wraped by a TLS. - """ - if entrypoint.peer_id is not None and peer is not None and entrypoint.peer_id != peer.id: - self.log.debug('skipping because the entrypoint peer_id does not match the actual peer_id', - entrypoint=str(entrypoint)) - return - - for connecting_peer in self.connecting_peers.values(): - if connecting_peer.entrypoint.addr == entrypoint.addr: - self.log.debug( - 'skipping because we are already connecting to this endpoint', - entrypoint=str(entrypoint), - ) - return + if self.localhost_only and not endpoint.addr.is_localhost(): + self.log.debug('skip because of simple localhost check', endpoint=str(endpoint)) + return None - if self.localhost_only and not entrypoint.addr.is_localhost(): - self.log.debug('skip because of simple localhost check', entrypoint=str(entrypoint)) - return + if not self.enable_ipv6 and endpoint.addr.is_ipv6(): + self.log.info('skip because IPv6 is disabled', endpoint=str(endpoint)) + return None - if not self.enable_ipv6 and entrypoint.addr.is_ipv6(): - self.log.info('skip because IPv6 is disabled', entrypoint=entrypoint) - return + if self.disable_ipv4 and endpoint.addr.is_ipv4(): + self.log.info('skip because IPv4 is disabled', endpoint=str(endpoint)) + return None - if self.disable_ipv4 and entrypoint.addr.is_ipv4(): - self.log.info('skip because IPv4 is disabled', entrypoint=entrypoint) - return + already_exists = self._connections.on_connecting(addr=endpoint.addr) + if already_exists: + self.log.debug('skipping because we are already connected(ing) to this endpoint', endpoint=str(endpoint)) + return None - if use_ssl is None: - use_ssl = self.use_ssl - - endpoint = entrypoint.addr.to_client_endpoint(self.reactor) - - factory: IProtocolFactory - if use_ssl: - factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, True, self.client_factory) - else: - factory = self.client_factory + factory: IProtocolFactory = self.client_factory + if self.use_ssl: + factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, True, factory) if peer is not None: now = int(self.reactor.seconds()) peer.info.increment_retry_attempt(now) - deferred = endpoint.connect(factory) - self.connecting_peers[endpoint] = _ConnectingPeer(entrypoint, deferred) + peer_id = peer.id if peer else endpoint.peer_id + deferred = endpoint.addr.to_client_endpoint(self.reactor).connect(factory) + deferred \ + .addCallback(self._connect_to_callback, endpoint.addr, peer_id) \ + .addErrback(self.on_connection_failure, endpoint) - deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint) - deferred.addErrback(self.on_connection_failure, peer, endpoint) - self.log.info('connecting to', entrypoint=str(entrypoint), peer=str(peer)) + self.log.info('connecting to', endpoint=str(endpoint), peer_id=str(peer_id)) self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTING, peer=peer, peers_count=self._get_peers_count() ) + return deferred - def listen(self, description: str, use_ssl: Optional[bool] = None) -> None: + def listen(self, description: str) -> None: """ Start to listen for new connection according to the description. - If `ssl` is True, then the connection will be wraped by a TLS. - :Example: `manager.listen(description='tcp:8000')` @@ -705,14 +652,9 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> None: """ endpoint = endpoints.serverFromString(self.reactor, description) - if use_ssl is None: - use_ssl = self.use_ssl - - factory: IProtocolFactory - if use_ssl: - factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, False, self.server_factory) - else: - factory = self.server_factory + factory: IProtocolFactory = self.server_factory + if self.use_ssl: + factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, False, factory) factory = NetfilterFactory(self, factory) @@ -749,42 +691,17 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address) self.my_peer.info.entrypoints.append(hostname_entrypoint) - def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: - """ When there are duplicate connections, determine which one should be dropped. - - We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2) - on the peer id string is used for this comparison. - """ - assert protocol.peer is not None - assert protocol.peer.id is not None - assert protocol.my_peer.id is not None - other_connection = self.connected_peers[protocol.peer.id] - if bytes(protocol.my_peer.id) > bytes(protocol.peer.id): - # connection started by me is kept - if not protocol.inbound: - # other connection is dropped - return other_connection - else: - # this was started by peer, so drop it - return protocol - else: - # connection started by peer is kept - if not protocol.inbound: - return protocol - else: - return other_connection - def drop_connection(self, protocol: HathorProtocol) -> None: """ Drop a connection """ assert protocol.peer is not None self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__) - protocol.send_error_and_close_connection('Connection droped') + protocol.send_error_and_close_connection('Connection dropped') def drop_connection_by_peer_id(self, peer_id: PeerId) -> None: """ Drop a connection by peer id """ - protocol = self.connected_peers.get(peer_id) + protocol = self.get_ready_peer_by_id(peer_id) if protocol: self.drop_connection(protocol) @@ -809,25 +726,24 @@ def set_always_enable_sync(self, values: list[PeerId]) -> None: self.log.info('update always_enable_sync', new=new, to_enable=to_enable, to_disable=to_disable) for peer_id in new: - if peer_id not in self.connected_peers: - continue - self.connected_peers[peer_id].enable_sync() + if peer := self.get_ready_peer_by_id(peer_id): + peer.enable_sync() for peer_id in to_disable: - if peer_id not in self.connected_peers: - continue - self.connected_peers[peer_id].disable_sync() + if peer := self.get_ready_peer_by_id(peer_id): + peer.disable_sync() self.always_enable_sync = new def _calculate_sync_rotate(self) -> _SyncRotateInfo: """Calculate new sync rotation.""" + ready_peers = self._connections.ready_peers().values() current_enabled: set[PeerId] = set() - for peer_id, conn in self.connected_peers.items(): + for conn in ready_peers: if conn.is_sync_enabled(): - current_enabled.add(peer_id) + current_enabled.add(conn.peer.id) - candidates = list(self.connected_peers.keys()) + candidates = [conn.peer.id for conn in ready_peers] self.rng.shuffle(candidates) selected_peers: set[PeerId] = set(candidates[:self.MAX_ENABLED_SYNC]) @@ -865,10 +781,14 @@ def _sync_rotate_if_needed(self, *, force: bool = False) -> None: ) for peer_id in info.to_disable: - self.connected_peers[peer_id].disable_sync() + peer = self.get_ready_peer_by_id(peer_id) + assert peer is not None + peer.disable_sync() for peer_id in info.to_enable: - self.connected_peers[peer_id].enable_sync() + peer = self.get_ready_peer_by_id(peer_id) + assert peer is not None + peer.enable_sync() def reload_entrypoints_and_connections(self) -> None: """Kill all connections and reload entrypoints from the original peer config file.""" diff --git a/hathor/p2p/peer.py b/hathor/p2p/peer.py index 8bc963b93..861eb797a 100644 --- a/hathor/p2p/peer.py +++ b/hathor/p2p/peer.py @@ -149,31 +149,15 @@ async def validate_entrypoint(self, protocol: HathorProtocol) -> bool: # Entrypoint validation with connection string and connection host # Entrypoints have the format tcp://IP|name:port for entrypoint in self.entrypoints: - if protocol.entrypoint is not None: - # Connection string has the format tcp://IP:port - # So we must consider that the entrypoint could be in name format - if protocol.entrypoint.addr == entrypoint: - return True - # TODO: don't use `daa.TEST_MODE` for this - test_mode = not_none(DifficultyAdjustmentAlgorithm.singleton).TEST_MODE - result = await discover_dns(entrypoint.host, test_mode) - if protocol.entrypoint.addr in [endpoint.addr for endpoint in result]: - return True - else: - # When the peer is the server part of the connection we don't have the full entrypoint description - # So we can only validate the host from the protocol - assert protocol.transport is not None - connection_remote = protocol.transport.getPeer() - connection_host = getattr(connection_remote, 'host', None) - if connection_host is None: - continue - # Connection host has only the IP - # So we must consider that the entrypoint could be in name format and we just validate the host - if connection_host == entrypoint.host: - return True - test_mode = not_none(DifficultyAdjustmentAlgorithm.singleton).TEST_MODE - result = await discover_dns(entrypoint.host, test_mode) - if connection_host in [entrypoint.addr.host for entrypoint in result]: + # Connection string has the format tcp://IP:port + # So we must consider that the entrypoint could be in name format + if protocol.addr == entrypoint: + return True + # TODO: don't use `daa.TEST_MODE` for this + test_mode = not_none(DifficultyAdjustmentAlgorithm.singleton).TEST_MODE + result = await discover_dns(entrypoint.host, test_mode) + for endpoint in result: + if protocol.addr == endpoint.addr: return True return False diff --git a/hathor/p2p/peer_connections.py b/hathor/p2p/peer_connections.py new file mode 100644 index 000000000..d42d4acc1 --- /dev/null +++ b/hathor/p2p/peer_connections.py @@ -0,0 +1,200 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from hathor.p2p.peer_endpoint import PeerAddress +from hathor.p2p.peer_id import PeerId +from hathor.p2p.protocol import HathorProtocol + + +@dataclass(slots=True, frozen=True, kw_only=True) +class PeerCounts: + """Simple wrapper for metrics.""" + connecting: int + handshaking: int + ready: int + + +class PeerConnections: + """ + This class represents all peer connections made by a ConnectionsManager. + It's also responsible for reacting for state changes on those connections. + """ + + __slots__ = ('_connecting_outbound', '_handshaking', '_ready', '_addr_by_id') + + def __init__(self) -> None: + # Peers that are in the "connecting" state, between starting a connection and Twisted calling `connectionMade`. + # This is only for outbound peers, that is, connections initiated by us. + # They're uniquely identified by the address we're connecting to. + self._connecting_outbound: set[PeerAddress] = set() + + # Peers that are handshaking, in a state after being connected and before reaching the READY state. + # They're uniquely identified by the address we're connected to. + self._handshaking: dict[PeerAddress, HathorProtocol] = {} + + # Peers that are in the READY state. + # They're uniquely identified by the address we're connected to. + # Note: there may be peers with duplicate PeerIds in this structure. + self._ready: dict[PeerAddress, HathorProtocol] = {} + + # Auxiliary structure for uniquely identifying READY peers by their PeerId. When there are peers with + # duplicate PeerIds, this identifies the connection we chose to keep. + self._addr_by_id: dict[PeerId, PeerAddress] = {} + + def connecting_outbound_peers(self) -> set[PeerAddress]: + """Get connecting outbound peers.""" + return self._connecting_outbound.copy() + + def handshaking_peers(self) -> dict[PeerAddress, HathorProtocol]: + """Get handshaking peers.""" + return self._handshaking.copy() + + def ready_peers(self) -> dict[PeerAddress, HathorProtocol]: + """Get ready peers, not including possible PeerId duplicates.""" + return { + addr: self._ready[addr] + for addr in self._addr_by_id.values() + } + + def not_ready_peers(self) -> list[PeerAddress]: + """Get not ready peers, that is, peers that are either connecting or handshaking.""" + return list(self._connecting_outbound) + list(self._handshaking) + + def connected_peers(self) -> dict[PeerAddress, HathorProtocol]: + """ + Get peers that are connected, that is, peers that are either handshaking or ready. + Does not include possible PeerId duplicates. + """ + return self.handshaking_peers() | self.ready_peers() + + def all_peers(self) -> list[PeerAddress]: + """Get all peers, ready or not. Does not include possible PeerId duplicates.""" + return self.not_ready_peers() + list(self.ready_peers()) + + def get_ready_peer_by_id(self, peer_id: PeerId) -> HathorProtocol | None: + """ + Get a ready peer by its PeerId. If there are connections with duplicate PeerIds, + we return the one that we chose to keep. + """ + addr = self._addr_by_id.get(peer_id) + return self._ready[addr] if addr else None + + def get_peer_counts(self) -> PeerCounts: + """Return the peer counts, for metrics.""" + return PeerCounts( + connecting=len(self._connecting_outbound), + handshaking=len(self._handshaking), + ready=len(self._ready), + ) + + def is_peer_ready(self, peer_id: PeerId) -> bool: + """Return whether a peer is ready, by its PeerId.""" + return peer_id in self._addr_by_id + + def on_connecting(self, *, addr: PeerAddress) -> bool: + """ + Callback for when an outbound connection is initiated. + Returns True if this address already exists, either connecting or connected, and False otherwise.""" + if addr in self.all_peers(): + return True + + self._connecting_outbound.add(addr) + return False + + def on_failed_to_connect(self, *, addr: PeerAddress) -> None: + """Callback for when an outbound connection fails before getting connected.""" + assert addr in self._connecting_outbound + assert addr not in self.connected_peers() + self._connecting_outbound.remove(addr) + + def on_connected(self, *, protocol: HathorProtocol) -> None: + """Callback for when an outbound connection gets connected.""" + assert protocol.addr not in self.connected_peers() + + if protocol.inbound: + assert protocol.addr not in self._connecting_outbound + else: + assert protocol.addr in self._connecting_outbound + self._connecting_outbound.remove(protocol.addr) + + self._handshaking[protocol.addr] = protocol + + def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: + """ + Callback for when a connection is closed during a handshaking state, that is, + after getting connected and before getting READY. + """ + assert addr not in self._connecting_outbound + assert addr in self._handshaking + assert addr not in self._ready + self._handshaking.pop(addr) + + def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> HathorProtocol | None: + """ + Callback for when a connection gets to the READY state. + If the PeerId of this connection is duplicate, return the protocol that we should disconnect. + Return None otherwise. + """ + assert addr not in self._connecting_outbound + assert addr in self._handshaking + assert addr not in self._ready + + protocol = self._handshaking.pop(addr) + self._ready[addr] = protocol # We always index it by address, even if its PeerId is duplicate. + + connection_to_drop: HathorProtocol | None = None + + # If there's an existing connection with the same PeerId, this is a duplicate connection + if old_connection := self.get_ready_peer_by_id(protocol.peer.id): + # We choose to drop either the new or the old connection. + if self._should_drop_new_connection(protocol): + # We return early when we drop the new connection, + # so we don't override the old connection in _addr_by_id with it below. + return protocol + + # When dropping the old connection, we do override it in _addr_by_id below. + connection_to_drop = old_connection + + self._addr_by_id[peer_id] = addr + return connection_to_drop + + def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: + """Callback for when a connection is closed during the READY state.""" + assert addr not in self._connecting_outbound + assert addr not in self._handshaking + assert addr in self._ready + self._ready.pop(addr) + + if self._addr_by_id[peer_id] == addr: + self._addr_by_id.pop(peer_id) + + def on_unknown_disconnect(self, *, addr: PeerAddress) -> None: + """Callback for when a connection is closed during an unknown state.""" + assert addr not in self._handshaking + assert addr not in self._ready + if addr in self._connecting_outbound: + self._connecting_outbound.remove(addr) + + @staticmethod + def _should_drop_new_connection(new_conn: HathorProtocol) -> bool: + """ + When there are connections with duplicate PeerIds, determine which one should be dropped, the old or the new. + Return True if we should drop the new connection, and False otherwise. + + The logic to determine this is `(my_peer_id > other_peer_id) XNOR new_conn.inbound`. + """ + my_peer_is_larger = bytes(new_conn.my_peer.id) > bytes(new_conn.peer.id) + return my_peer_is_larger == new_conn.inbound diff --git a/hathor/p2p/peer_discovery/bootstrap.py b/hathor/p2p/peer_discovery/bootstrap.py index 55b5e9f16..6e71f310e 100644 --- a/hathor/p2p/peer_discovery/bootstrap.py +++ b/hathor/p2p/peer_discovery/bootstrap.py @@ -15,6 +15,8 @@ from typing import Callable from structlog import get_logger +from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IProtocol from typing_extensions import override from hathor.p2p.peer_endpoint import PeerEndpoint @@ -37,6 +39,6 @@ def __init__(self, entrypoints: list[PeerEndpoint]): self.entrypoints = entrypoints @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], Deferred[IProtocol] | None]) -> None: for entrypoint in self.entrypoints: connect_to(entrypoint) diff --git a/hathor/p2p/peer_discovery/dns.py b/hathor/p2p/peer_discovery/dns.py index c5dfe74d6..0debde977 100644 --- a/hathor/p2p/peer_discovery/dns.py +++ b/hathor/p2p/peer_discovery/dns.py @@ -19,6 +19,7 @@ from structlog import get_logger from twisted.internet.defer import Deferred, gatherResults +from twisted.internet.interfaces import IProtocol from twisted.names.client import lookupAddress, lookupText from twisted.names.dns import Record_A, Record_TXT, RRHeader from typing_extensions import override @@ -53,7 +54,7 @@ def do_lookup_text(self, host: str) -> Deferred[LookupResult]: return lookupText(host) @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], Deferred[IProtocol] | None]) -> None: """ Run DNS lookup for host and connect to it This is executed when starting the DNS Peer Discovery and first connecting to the network """ diff --git a/hathor/p2p/peer_discovery/peer_discovery.py b/hathor/p2p/peer_discovery/peer_discovery.py index 7d040fae2..49019e739 100644 --- a/hathor/p2p/peer_discovery/peer_discovery.py +++ b/hathor/p2p/peer_discovery/peer_discovery.py @@ -15,6 +15,9 @@ from abc import ABC, abstractmethod from typing import Callable +from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IProtocol + from hathor.p2p.peer_endpoint import PeerEndpoint @@ -23,7 +26,7 @@ class PeerDiscovery(ABC): """ @abstractmethod - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], Deferred[IProtocol] | None]) -> None: """ This method must discover the peers and call `connect_to` for each of them. :param connect_to: Function which will be called for each discovered peer. diff --git a/hathor/p2p/peer_endpoint.py b/hathor/p2p/peer_endpoint.py index 62e4624a2..c843eae70 100644 --- a/hathor/p2p/peer_endpoint.py +++ b/hathor/p2p/peer_endpoint.py @@ -167,7 +167,7 @@ def from_hostname_address(cls, hostname: str, address: IPv4Address | IPv6Address @classmethod def from_address(cls, address: IAddress) -> Self: - """Create an Entrypoint from a Twisted IAddress.""" + """Create a PeerAddress from a Twisted IAddress.""" if not isinstance(address, (IPv4Address, IPv6Address)): raise NotImplementedError(f'address: {address}') return cls.parse(f'{address.type}://{address.host}:{address.port}') diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index cd90601e8..670dcf363 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -26,8 +26,8 @@ from hathor.conf.settings import HathorSettings from hathor.p2p.messages import ProtocolMessages -from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer -from hathor.p2p.peer_endpoint import PeerEndpoint +from hathor.p2p.peer import PrivatePeer, PublicPeer +from hathor.p2p.peer_endpoint import PeerAddress from hathor.p2p.peer_id import PeerId from hathor.p2p.rate_limiter import RateLimiter from hathor.p2p.states import BaseState, HelloState, PeerIdState, ReadyState @@ -83,7 +83,6 @@ class WarningFlags(str, Enum): state: Optional[BaseState] connection_time: float _state_instances: dict[PeerState, BaseState] - entrypoint: Optional[PeerEndpoint] warning_flags: set[str] aborting: bool diff_timestamp: Optional[int] @@ -104,10 +103,12 @@ def __init__( settings: HathorSettings, use_ssl: bool, inbound: bool, + addr: PeerAddress, ) -> None: self._settings = settings self.my_peer = my_peer self.connections = p2p_manager + self.addr = addr assert p2p_manager.manager is not None self.node = p2p_manager.manager @@ -147,10 +148,6 @@ def __init__( self.ratelimit: RateLimiter = RateLimiter(self.reactor) # self.ratelimit.set_limit(self.RateLimitKeys.GLOBAL, 120, 60) - # Connection string of the peer - # Used to validate if entrypoints has this string - self.entrypoint: Optional[PeerEndpoint] = None - # Peer id sent in the connection url that is expected to connect (optional) self.expected_peer_id: PeerId | None = None @@ -254,14 +251,11 @@ def on_connect(self) -> None: if self.connections: self.connections.on_peer_connect(self) - def on_outbound_connect(self, entrypoint: PeerEndpoint, peer: UnverifiedPeer | PublicPeer | None) -> None: + def on_outbound_connect(self, peer_id: PeerId | None) -> None: """Called when we successfully establish an outbound connection to a peer.""" - # Save the used entrypoint in protocol so we can validate that it matches the entrypoints data - if entrypoint.peer_id is not None and peer is not None: - assert entrypoint.peer_id == peer.id - - self.expected_peer_id = peer.id if peer else entrypoint.peer_id - self.entrypoint = entrypoint + # Save the peer_id so we can validate that it matches the one we'll receive in the PEER-ID state + assert not self.inbound + self.expected_peer_id = peer_id def on_peer_ready(self) -> None: assert self.connections is not None @@ -282,11 +276,33 @@ def on_disconnect(self, reason: Failure) -> None: self._idle_timeout_call_later = None self.aborting = True self.update_log_context() - if self.state: - self.state.on_exit() + + if not self.state: + # TODO: This should never happen, it can only happen if an exception was raised in the middle of our + # connection callback (connectionMade/on_connect). In that case, we may have not initialized our state + # yet. We should improve this by making an initial non-None state. + self.log.error( + 'disconnecting protocol with no state. check for previous exceptions', + addr=str(self.addr), + peer_id=str(self.get_peer_id()), + ) + self.connections.on_unknown_disconnect(addr=self.addr) + return + self.state.on_exit() + state_name = self.state.state_name + + if self.is_state(self.PeerState.HELLO) or self.is_state(self.PeerState.PEER_ID): self.state = None - if self.connections: - self.connections.on_peer_disconnect(self) + self.connections.on_handshake_disconnect(addr=self.addr) + return + + if self.is_state(self.PeerState.READY): + self.state = None + self.connections.on_ready_disconnect(addr=self.addr, peer_id=self.peer.id) + return + + self.state = None + raise AssertionError(f'disconnected in unexpected state: {state_name or "unknown"}') def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None: """ A generic message which must be implemented to send a message @@ -401,6 +417,10 @@ class HathorLineReceiver(LineReceiver, HathorProtocol): """ MAX_LENGTH = 65536 + def makeConnection(self, transport: ITransport) -> None: + assert self.addr == PeerAddress.from_address(transport.getPeer()) + super().makeConnection(transport) + def connectionMade(self) -> None: super(HathorLineReceiver, self).connectionMade() self.setLineMode() diff --git a/hathor/p2p/resources/add_peers.py b/hathor/p2p/resources/add_peers.py index c8faeb5dc..a7aa3f212 100644 --- a/hathor/p2p/resources/add_peers.py +++ b/hathor/p2p/resources/add_peers.py @@ -72,7 +72,7 @@ def render_POST(self, request: Request) -> bytes: def already_connected(endpoint: PeerEndpoint) -> bool: # ignore peers that we're already trying to connect for ready_endpoint in self.manager.connections.iter_not_ready_endpoints(): - if endpoint.addr == ready_endpoint.addr: + if endpoint.addr == ready_endpoint: return True # remove peers we already know about diff --git a/hathor/p2p/resources/status.py b/hathor/p2p/resources/status.py index 68edb9f0e..896220b9a 100644 --- a/hathor/p2p/resources/status.py +++ b/hathor/p2p/resources/status.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.web.http import Request + import hathor from hathor.api_util import Resource, set_cors from hathor.cli.openapi_files.register import register_resource @@ -34,7 +36,7 @@ def __init__(self, manager): self.manager = manager self.reactor = manager.reactor - def render_GET(self, request): + def render_GET(self, request: Request) -> bytes: request.setHeader(b'content-type', b'application/json; charset=utf-8') set_cors(request, 'GET') @@ -42,17 +44,14 @@ def render_GET(self, request): connecting_peers = [] # TODO: refactor as not to use a private item - for endpoint, deferred in self.manager.connections.connecting_peers.items(): - host = getattr(endpoint, '_host', '') - port = getattr(endpoint, '_port', '') - connecting_peers.append({'deferred': str(deferred), 'address': '{}:{}'.format(host, port)}) + for address in self.manager.connections.iter_connecting_outbound_peers(): + connecting_peers.append({'address': str(address)}) handshaking_peers = [] # TODO: refactor as not to use a private item - for conn in self.manager.connections.handshaking_peers: - remote = conn.transport.getPeer() + for conn in self.manager.connections.iter_handshaking_peers(): handshaking_peers.append({ - 'address': '{}:{}'.format(remote.host, remote.port), + 'address': str(conn.addr), 'state': conn.state.state_name, 'uptime': now - conn.connection_time, 'app_version': conn.app_version, @@ -60,7 +59,6 @@ def render_GET(self, request): connected_peers = [] for conn in self.manager.connections.iter_ready_connections(): - remote = conn.transport.getPeer() status = {} status[conn.state.sync_agent.name] = conn.state.sync_agent.get_status() connected_peers.append({ @@ -68,7 +66,7 @@ def render_GET(self, request): 'app_version': conn.app_version, 'current_time': now, 'uptime': now - conn.connection_time, - 'address': '{}:{}'.format(remote.host, remote.port), + 'address': str(conn.addr), 'state': conn.state.state_name, # 'received_bytes': conn.received_bytes, 'rtt': list(conn.state.rtt_window), @@ -134,7 +132,7 @@ def render_GET(self, request): 'id': '5578ab3bcaa861fb9d07135b8b167dd230d4487b147be8fd2c94a79bd349d123', 'app_version': 'Hathor v0.14.0-beta', 'uptime': 118.37029600143433, - 'address': '192.168.1.1:54321', + 'address': 'tcp://192.168.1.1:54321', 'state': 'READY', 'last_message': 1539271481, 'plugins': { @@ -149,8 +147,7 @@ def render_GET(self, request): 'peer_best_blockchain': [_openapi_height_info] } _openapi_connecting_peer = { - 'deferred': '>', # noqa - 'address': '192.168.1.1:54321' + 'address': 'tcp://192.168.1.1:54321' } StatusResource.openapi = { @@ -201,7 +198,7 @@ def render_GET(self, request): 'connected_peers': [_openapi_connected_peer], 'handshaking_peers': [ { - 'address': '192.168.1.1:54321', + 'address': 'tcp://192.168.1.1:54321', 'state': 'HELLO', 'uptime': 0.0010249614715576172, 'app_version': 'Unknown' diff --git a/hathor/p2p/states/peer_id.py b/hathor/p2p/states/peer_id.py index e46e62ce9..b61cb9366 100644 --- a/hathor/p2p/states/peer_id.py +++ b/hathor/p2p/states/peer_id.py @@ -123,7 +123,7 @@ async def handle_peer_id(self, payload: str) -> None: return if protocol.connections is not None: - if protocol.connections.is_peer_connected(peer.id): + if protocol.connections.is_peer_ready(peer.id): protocol.send_error_and_close_connection('We are already connected.') return @@ -132,9 +132,6 @@ async def handle_peer_id(self, payload: str) -> None: protocol.send_error_and_close_connection('Connection string is not in the entrypoints.') return - if protocol.entrypoint is not None and protocol.entrypoint.peer_id is not None: - assert protocol.entrypoint.peer_id == peer.id - if protocol.use_ssl: certificate_valid = peer.validate_certificate(protocol) if not certificate_valid: diff --git a/hathor/prometheus.py b/hathor/prometheus.py index 5418c44d4..965d68443 100644 --- a/hathor/prometheus.py +++ b/hathor/prometheus.py @@ -183,7 +183,11 @@ def set_new_metrics(self) -> None: """ Update metric_gauges dict with new data from metrics """ for metric_name in METRIC_INFO.keys(): - self.metric_gauges[metric_name].set(getattr(self.metrics, metric_name)) + metric_attr = metric_name + if metric_attr == 'connected_peers': + # TODO: Improve this. Temporary backwards compatibility workaround after a rename in metrics. + metric_attr = 'ready_peers' + self.metric_gauges[metric_name].set(getattr(self.metrics, metric_attr)) self._set_rocksdb_tx_storage_metrics() self._set_new_peer_connection_metrics() diff --git a/hathor/simulator/fake_connection.py b/hathor/simulator/fake_connection.py index 3c030c901..c5741c59e 100644 --- a/hathor/simulator/fake_connection.py +++ b/hathor/simulator/fake_connection.py @@ -103,6 +103,14 @@ def entrypoint(self) -> PeerEndpoint: return entrypoint.with_id(self.manager1.my_peer.id) return entrypoint.with_id(self._fake_bootstrap_id) + @property + def peer_addr1(self) -> PeerAddress: + return PeerAddress.from_address(self.addr1) + + @property + def peer_addr2(self) -> PeerAddress: + return PeerAddress.from_address(self.addr2) + @property def proto1(self): return self._proto1 @@ -277,12 +285,9 @@ def reconnect(self) -> None: # When _fake_bootstrap_id is set we don't pass the peer because that's how bootstrap calls connect_to() peer = self._proto1.my_peer.to_unverified_peer() if self._fake_bootstrap_id is False else None - self.manager2.connections.connect_to(self.entrypoint, peer) - - connecting_peers = list(self.manager2.connections.connecting_peers.values()) - for connecting_peer in connecting_peers: - if connecting_peer.entrypoint.addr == self.entrypoint.addr: - connecting_peer.endpoint_deferred.callback(self._proto2) + deferred = self.manager2.connections.connect_to(self.entrypoint, peer) + assert deferred is not None + deferred.callback(self._proto2) self.tr1 = HathorStringTransport(self._proto2.my_peer, peer_address=self.addr2) self.tr2 = HathorStringTransport(self._proto1.my_peer, peer_address=self.addr1) diff --git a/hathor/sysctl/p2p/manager.py b/hathor/sysctl/p2p/manager.py index 9f9856a42..28d009607 100644 --- a/hathor/sysctl/p2p/manager.py +++ b/hathor/sysctl/p2p/manager.py @@ -235,7 +235,7 @@ def set_kill_connection(self, peer_id: str, force: bool = False) -> None: peer_id_obj = PeerId(peer_id) except ValueError: raise SysctlException('invalid peer-id') - conn = self.connections.connected_peers.get(peer_id_obj, None) + conn = self.connections.get_ready_peer_by_id(peer_id_obj) if conn is None: self.log.warn('Killing connection', peer_id=peer_id) raise SysctlException('peer-id is not connected') diff --git a/hathor/websocket/factory.py b/hathor/websocket/factory.py index b96dcbdff..a8f432dd5 100644 --- a/hathor/websocket/factory.py +++ b/hathor/websocket/factory.py @@ -158,7 +158,7 @@ def _send_metrics(self): 'blocks': self.metrics.blocks, 'best_block_height': self.metrics.best_block_height, 'hash_rate': self.metrics.hash_rate, - 'peers': self.metrics.connected_peers, + 'peers': self.metrics.ready_peers, 'type': 'dashboard:metrics', 'time': self.reactor.seconds(), }) diff --git a/tests/others/test_metrics.py b/tests/others/test_metrics.py index b46f6985b..45ca40631 100644 --- a/tests/others/test_metrics.py +++ b/tests/others/test_metrics.py @@ -5,7 +5,7 @@ from hathor.p2p.manager import PeerConnectionsMetrics from hathor.p2p.peer import PrivatePeer -from hathor.p2p.peer_endpoint import PeerEndpoint +from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint from hathor.p2p.protocol import HathorProtocol from hathor.pubsub import HathorEvents from hathor.simulator.utils import add_new_blocks @@ -42,7 +42,7 @@ def test_p2p_network_events(self): # Assertion self.assertEquals(manager.metrics.connecting_peers, 3) self.assertEquals(manager.metrics.handshaking_peers, 4) - self.assertEquals(manager.metrics.connected_peers, 5) + self.assertEquals(manager.metrics.ready_peers, 5) self.assertEquals(manager.metrics.known_peers, 6) manager.metrics.stop() @@ -60,25 +60,35 @@ def test_connections_manager_integration(self): wallet = Wallet(directory=tmpdir) wallet.unlock(b'teste') manager = self.create_peer('testnet', tx_storage=tx_storage, wallet=wallet) + p2p_manager = manager.connections - manager.connections.verified_peer_storage.update({ + p2p_manager.verified_peer_storage.update({ "1": PrivatePeer.auto_generated(), "2": PrivatePeer.auto_generated(), "3": PrivatePeer.auto_generated(), }) - manager.connections.connected_peers.update({"1": Mock(), "2": Mock()}) - manager.connections.handshaking_peers.update({Mock()}) + peer1 = Mock() + peer1.addr = PeerAddress.parse('tcp://localhost:40403') + peer2 = Mock() + peer2.addr = PeerAddress.parse('tcp://localhost:40404') + peer3 = Mock() + peer3.addr = PeerAddress.parse('tcp://localhost:40405') + p2p_manager._connections.on_connected(protocol=peer1) + p2p_manager._connections.on_connected(protocol=peer2) + p2p_manager._connections.on_connected(protocol=peer3) + p2p_manager._connections.on_ready(addr=peer1.addr, peer_id=Mock()) + p2p_manager._connections.on_ready(addr=peer2.addr, peer_id=Mock()) # Execution endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') # This will trigger sending to the pubsub one of the network events - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to(endpoint) self.run_to_completion() # Assertion self.assertEquals(manager.metrics.known_peers, 3) - self.assertEquals(manager.metrics.connected_peers, 2) + self.assertEquals(manager.metrics.ready_peers, 2) self.assertEquals(manager.metrics.handshaking_peers, 1) self.assertEquals(manager.metrics.connecting_peers, 1) @@ -215,18 +225,20 @@ def test_peer_connections_data_collection(self): self.use_memory_storage = True manager = self.create_peer('testnet') self.assertIsInstance(manager.tx_storage, TransactionMemoryStorage) - - my_peer = manager.my_peer + port = 40403 def build_hathor_protocol(): + nonlocal port protocol = HathorProtocol( - my_peer=my_peer, + my_peer=manager.my_peer, p2p_manager=manager.connections, use_ssl=False, inbound=False, - settings=self._settings + settings=self._settings, + addr=PeerAddress.parse(f'tcp://localhost:{port}') ) protocol._peer = PrivatePeer.auto_generated().to_public_peer() + port += 1 return protocol @@ -246,9 +258,12 @@ def build_hathor_protocol(): fake_peers[2].metrics.discarded_blocks = 3 fake_peers[2].metrics.discarded_txs = 3 - manager.connections.connections.add(fake_peers[0]) - manager.connections.connections.add(fake_peers[1]) - manager.connections.connections.add(fake_peers[2]) + manager.connections._connections._addr_by_id[fake_peers[0].peer.id] = fake_peers[0].addr + manager.connections._connections._addr_by_id[fake_peers[1].peer.id] = fake_peers[1].addr + manager.connections._connections._addr_by_id[fake_peers[2].peer.id] = fake_peers[2].addr + manager.connections._connections._ready[fake_peers[0].addr] = fake_peers[0] + manager.connections._connections._ready[fake_peers[1].addr] = fake_peers[1] + manager.connections._connections._ready[fake_peers[2].addr] = fake_peers[2] # Execution manager.metrics._collect_data() diff --git a/tests/p2p/test_bootstrap.py b/tests/p2p/test_bootstrap.py index 9855a0fda..26dae9097 100644 --- a/tests/p2p/test_bootstrap.py +++ b/tests/p2p/test_bootstrap.py @@ -1,6 +1,7 @@ from typing import Callable from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IProtocol from twisted.names.dns import TXT, A, Record_A, Record_TXT, RRHeader from typing_extensions import override @@ -9,28 +10,38 @@ from hathor.p2p.peer_discovery import DNSPeerDiscovery, PeerDiscovery from hathor.p2p.peer_discovery.dns import LookupResult from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint, Protocol +from hathor.p2p.peer_id import PeerId from hathor.pubsub import PubSubManager from tests import unittest from tests.test_memory_reactor_clock import TestMemoryReactorClock class MockPeerDiscovery(PeerDiscovery): - def __init__(self, mocked_host_ports: list[tuple[str, int]]): - self.mocked_host_ports = mocked_host_ports + def __init__(self, mocked_addrs: list[tuple[str, int, str | None]]): + self.mocked_addrs = mocked_addrs @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: - for host, port in self.mocked_host_ports: - addr = PeerAddress(Protocol.TCP, host, port) - connect_to(addr.with_id()) + async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], Deferred[IProtocol] | None]) -> None: + for host, port, peer_id_str in self.mocked_addrs: + peer_id = PeerId(peer_id_str) if peer_id_str is not None else None + connect_to(PeerAddress(Protocol.TCP, host, port).with_id(peer_id)) class MockDNSPeerDiscovery(DNSPeerDiscovery): - def __init__(self, reactor: TestMemoryReactorClock, bootstrap_txt: list[tuple[str, int]], bootstrap_a: list[str]): + def __init__( + self, + reactor: TestMemoryReactorClock, + bootstrap_txt: list[tuple[str, int, str | None]], + bootstrap_a: list[str], + ): super().__init__(['test.example']) self.reactor = reactor self.mocked_lookup_a = [RRHeader(type=A, payload=Record_A(address)) for address in bootstrap_a] - txt_entries = [f'tcp://{h}:{p}'.encode() for h, p in bootstrap_txt] + txt_entries = [] + for host, port, peer_id_str in bootstrap_txt: + peer_id = PeerId(peer_id_str) if peer_id_str is not None else None + addr_and_id = PeerAddress(Protocol.TCP, host, port).with_id(peer_id) + txt_entries.append(str(addr_and_id).encode()) self.mocked_lookup_txt = [RRHeader(type=TXT, payload=Record_TXT(*txt_entries))] def do_lookup_address(self, host: str) -> Deferred[LookupResult]: @@ -63,23 +74,27 @@ def test_mock_discovery(self) -> None: ) host_ports1 = [ - ('foobar', 1234), - ('127.0.0.99', 9999), + ('foobar', 1234, None), + ('127.0.0.99', 9999, None), + ('192.168.0.1', 1111, 'c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696') ] host_ports2 = [ - ('baz', 456), - ('127.0.0.88', 8888), + ('baz', 456, None), + ('127.0.0.88', 8888, None), + ('192.168.0.2', 2222, 'bc5119d47bb4ea7c19100bd97fb11f36970482108bd3d45ff101ee4f6bbec872') ] connections.add_peer_discovery(MockPeerDiscovery(host_ports1)) connections.add_peer_discovery(MockPeerDiscovery(host_ports2)) connections.do_discovery() self.clock.advance(1) - connecting_entrypoints = {str(entrypoint) for entrypoint, _ in connections.connecting_peers.values()} - self.assertEqual(connecting_entrypoints, { + connecting_addrs = {str(addr) for addr in connections._connections.connecting_outbound_peers()} + self.assertEqual(connecting_addrs, { 'tcp://foobar:1234', 'tcp://127.0.0.99:9999', 'tcp://baz:456', 'tcp://127.0.0.88:8888', + 'tcp://192.168.0.1:1111', + 'tcp://192.168.0.2:2222', }) def test_dns_discovery(self) -> None: @@ -102,16 +117,18 @@ def test_dns_discovery(self) -> None: '127.0.0.88', ] bootstrap_txt = [ - ('foobar', 1234), - ('baz', 456), + ('foobar', 1234, None), + ('baz', 456, None), + ('192.168.0.1', 1111, 'c0f19299c2a4dcbb6613a14011ff07b63d6cb809e4cee25e9c1ccccdd6628696') ] connections.add_peer_discovery(MockDNSPeerDiscovery(self.clock, bootstrap_txt, bootstrap_a)) connections.do_discovery() self.clock.advance(1) - connecting_entrypoints = {str(entrypoint) for entrypoint, _ in connections.connecting_peers.values()} - self.assertEqual(connecting_entrypoints, { + connecting_addrs = {str(addr) for addr in connections._connections.connecting_outbound_peers()} + self.assertEqual(connecting_addrs, { 'tcp://127.0.0.99:40403', 'tcp://127.0.0.88:40403', 'tcp://foobar:1234', 'tcp://baz:456', + 'tcp://192.168.0.1:1111' }) diff --git a/tests/p2p/test_connections.py b/tests/p2p/test_connections.py index db5a85f1e..2c8a5510c 100644 --- a/tests/p2p/test_connections.py +++ b/tests/p2p/test_connections.py @@ -18,11 +18,11 @@ def test_manager_connections(self) -> None: manager: HathorManager = self.create_peer('testnet', enable_sync_v1=True, enable_sync_v2=False) endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to(endpoint) - self.assertIn(endpoint, manager.connections.iter_not_ready_endpoints()) - self.assertNotIn(endpoint, manager.connections.iter_ready_connections()) - self.assertNotIn(endpoint, manager.connections.iter_all_connections()) + self.assertIn(endpoint.addr, manager.connections.iter_not_ready_endpoints()) + self.assertNotIn(endpoint.addr, [conn.addr for conn in manager.connections.iter_ready_connections()]) + self.assertNotIn(endpoint.addr, [conn.addr for conn in manager.connections.get_connected_peers()]) def test_manager_disabled_ipv6(self) -> None: """Should not try to connect to ipv6 peers if ipv6 is disabled""" @@ -36,11 +36,11 @@ def test_manager_disabled_ipv6(self) -> None: ) endpoint = PeerEndpoint.parse('tcp://[::1]:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to(endpoint) - self.assertNotIn(endpoint, manager.connections.iter_not_ready_endpoints()) - self.assertNotIn(endpoint, manager.connections.iter_ready_connections()) - self.assertNotIn(endpoint, manager.connections.iter_all_connections()) + self.assertNotIn(endpoint.addr, manager.connections.iter_not_ready_endpoints()) + self.assertNotIn(endpoint.addr, [conn.addr for conn in manager.connections.iter_ready_connections()]) + self.assertNotIn(endpoint.addr, [conn.addr for conn in manager.connections.get_connected_peers()]) def test_manager_enabled_ipv6_and_ipv4(self) -> None: """Should connect to both ipv4 and ipv6 peers if both are enabled""" @@ -54,23 +54,17 @@ def test_manager_enabled_ipv6_and_ipv4(self) -> None: ) endpoint_ipv6 = PeerEndpoint.parse('tcp://[::3:2:1]:8005') - manager.connections.connect_to(endpoint_ipv6, use_ssl=True) + manager.connections.connect_to(endpoint_ipv6) endpoint_ipv4 = PeerEndpoint.parse('tcp://1.2.3.4:8005') - manager.connections.connect_to(endpoint_ipv4, use_ssl=True) + manager.connections.connect_to(endpoint_ipv4) - self.assertIn( - endpoint_ipv4.addr.host, - list(map(lambda x: x.addr.host, manager.connections.iter_not_ready_endpoints())) - ) - self.assertIn( - endpoint_ipv6.addr.host, - list(map(lambda x: x.addr.host, manager.connections.iter_not_ready_endpoints())) - ) + self.assertIn(endpoint_ipv4.addr, manager.connections.iter_not_ready_endpoints()) + self.assertIn(endpoint_ipv6.addr, manager.connections.iter_not_ready_endpoints()) self.assertEqual(2, len(list(manager.connections.iter_not_ready_endpoints()))) self.assertEqual(0, len(list(manager.connections.iter_ready_connections()))) - self.assertEqual(0, len(list(manager.connections.iter_all_connections()))) + self.assertEqual(0, len(list(manager.connections.get_connected_peers()))) def test_manager_disabled_ipv4(self) -> None: """Should not try to connect to ipv4 peers if ipv4 is disabled""" @@ -84,8 +78,8 @@ def test_manager_disabled_ipv4(self) -> None: ) endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to(endpoint) self.assertEqual(0, len(list(manager.connections.iter_not_ready_endpoints()))) self.assertEqual(0, len(list(manager.connections.iter_ready_connections()))) - self.assertEqual(0, len(list(manager.connections.iter_all_connections()))) + self.assertEqual(0, len(list(manager.connections.get_connected_peers()))) diff --git a/tests/p2p/test_get_best_blockchain.py b/tests/p2p/test_get_best_blockchain.py index ff0d95149..42bd0c4ce 100644 --- a/tests/p2p/test_get_best_blockchain.py +++ b/tests/p2p/test_get_best_blockchain.py @@ -32,8 +32,8 @@ def test_get_best_blockchain(self) -> None: self.simulator.add_connection(conn12) self.simulator.run(3600) - connected_peers1 = list(manager1.connections.connected_peers.values()) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers1 = list(manager1.connections.iter_ready_connections()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) self.assertEqual(1, len(connected_peers2)) @@ -94,13 +94,13 @@ def test_handle_get_best_blockchain(self) -> None: self.assertTrue(self.simulator.run(7200, trigger=trigger)) miner.stop() - connected_peers1 = list(manager1.connections.connected_peers.values()) + connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) protocol2 = connected_peers1[0] state2 = protocol2.state assert isinstance(state2, ReadyState) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] state1 = protocol1.state @@ -134,7 +134,7 @@ def test_handle_get_best_blockchain(self) -> None: self.assertFalse(conn12.tr1.disconnecting) self.assertFalse(conn12.tr2.disconnecting) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] state1 = protocol1.state @@ -153,13 +153,13 @@ def test_handle_best_blockchain(self) -> None: self.simulator.add_connection(conn12) self.simulator.run(60) - connected_peers1 = list(manager1.connections.connected_peers.values()) + connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) protocol2 = connected_peers1[0] state2 = protocol2.state assert isinstance(state2, ReadyState) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] state1 = protocol1.state @@ -215,9 +215,9 @@ def test_node_without_get_best_blockchain_capability(self) -> None: self.simulator.run(60) # assert the nodes are connected - connected_peers1 = list(manager1.connections.connected_peers.values()) + connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) # assert the peers have the proper capabilities @@ -313,13 +313,13 @@ def test_stop_looping_on_exit(self) -> None: self.simulator.add_connection(conn12) self.simulator.run(60) - connected_peers1 = list(manager1.connections.connected_peers.values()) + connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) protocol2 = connected_peers1[0] state2 = protocol2.state assert isinstance(state2, ReadyState) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] state1 = protocol1.state diff --git a/tests/p2p/test_peer_id.py b/tests/p2p/test_peer_id.py index 1f95cbd12..9dcf7fedf 100644 --- a/tests/p2p/test_peer_id.py +++ b/tests/p2p/test_peer_id.py @@ -2,13 +2,13 @@ import shutil import tempfile from typing import cast -from unittest.mock import Mock import pytest +from twisted.internet.address import IPv4Address from twisted.internet.interfaces import ITransport from hathor.p2p.peer import InvalidPeerIdException, PrivatePeer, PublicPeer, UnverifiedPeer -from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint +from hathor.p2p.peer_endpoint import PeerAddress from hathor.p2p.peer_id import PeerId from hathor.p2p.peer_storage import VerifiedPeerStorage from tests import unittest @@ -149,7 +149,7 @@ def test_retry_connection(self) -> None: def test_validate_certificate(self) -> None: builder = TestBuilder() artifacts = builder.build() - protocol = artifacts.p2p_manager.server_factory.buildProtocol(Mock()) + protocol = artifacts.p2p_manager.server_factory.buildProtocol(IPv4Address('TCP', 'localhost', 40403)) peer = PrivatePeer.auto_generated() @@ -268,8 +268,8 @@ async def test_validate_entrypoint(self) -> None: peer.info.entrypoints = [PeerAddress.parse('tcp://127.0.0.1:40403')] # we consider that we are starting the connection to the peer - protocol = manager.connections.client_factory.buildProtocol('127.0.0.1') - protocol.entrypoint = PeerEndpoint.parse('tcp://127.0.0.1:40403') + protocol = manager.connections.client_factory.buildProtocol(IPv4Address('TCP', 'localhost', 40403)) + protocol.addr = PeerAddress.parse('tcp://127.0.0.1:40403') result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) # if entrypoint is an URI @@ -277,33 +277,21 @@ async def test_validate_entrypoint(self) -> None: result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) # if entrypoint is an IPv6 - peer.entrypoints = [PeerEndpoint.parse('tcp://[::1]:40403')] + peer.entrypoints = [PeerAddress.parse('tcp://[::1]:40403')] result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) # test invalid. DNS in test mode will resolve to '127.0.0.1:40403' - protocol.entrypoint = PeerEndpoint.parse('tcp://45.45.45.45:40403') + protocol.addr = PeerAddress.parse('tcp://45.45.45.45:40403') result = await peer.info.validate_entrypoint(protocol) self.assertFalse(result) - # now test when receiving the connection - i.e. the peer starts it - protocol.entrypoint = None - peer.info.entrypoints = [PeerAddress.parse('tcp://127.0.0.1:40403')] - - from collections import namedtuple - DummyPeer = namedtuple('DummyPeer', 'host') - - class FakeTransport: - def getPeer(self) -> DummyPeer: - return DummyPeer(host='127.0.0.1') - protocol.transport = FakeTransport() - result = await peer.info.validate_entrypoint(protocol) - self.assertTrue(result) # if entrypoint is an URI + protocol.addr = PeerAddress.parse('tcp://127.0.0.1:40403') peer.info.entrypoints = [PeerAddress.parse('tcp://uri_name:40403')] result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) # if entrypoint is an IPv6 - peer.entrypoints = [PeerEndpoint.parse('tcp://[2001:db8::ff00:42:8329]:40403')] + peer.entrypoints = [PeerAddress.parse('tcp://[2001:db8::ff00:42:8329]:40403')] result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) diff --git a/tests/p2p/test_protocol.py b/tests/p2p/test_protocol.py index 708af1f0d..cae1ce60f 100644 --- a/tests/p2p/test_protocol.py +++ b/tests/p2p/test_protocol.py @@ -2,6 +2,7 @@ from typing import Optional from unittest.mock import Mock, patch +import pytest from twisted.internet import defer from twisted.internet.address import IPv4Address from twisted.internet.protocol import Protocol @@ -118,8 +119,11 @@ def test_rate_limit(self) -> None: # Test empty disconnect self.conn.proto1.state = None - self.conn.proto1.connections = None - self.conn.proto1.on_disconnect(Failure(Exception())) + with pytest.raises(AssertionError): + # TODO: This raises because we are trying to disconnect a protocol with no state, but it's not possible + # for a protocol to have no state after it's handshaking. We have to update this when we introduce the + # new non-None initial state for protocols. + self.conn.proto1.on_disconnect(Failure(Exception())) def test_invalid_size(self) -> None: self.conn.tr1.clear() @@ -283,6 +287,44 @@ def test_hello_with_ipv6_capability(self) -> None: self.assertTrue('192.168.1.1' in map(lambda x: x.host, conn.proto2.peer.info.entrypoints)) self.assertEqual(conn.proto1.peer.info.entrypoints[0].host, '192.168.1.1') + def test_invalid_duplicate_addr(self) -> None: + """ + We try to connect to an already connected entrypoint in each state, + and it should never add the new connection to connecting_outbound_peers. + """ + # We also specifically compare localhost with 127.0.0.1, because they are considered the same. + assert self.conn.addr2.type == 'TCP' and self.conn.addr2.host == '127.0.0.1' + entrypoint = PeerEndpoint.parse(f'tcp://localhost:{self.conn.addr2.port}') + + self.manager1.connections.connect_to(entrypoint) + assert self.manager1.connections._connections.connecting_outbound_peers() == set() + assert self.manager1.connections._connections.handshaking_peers() == {self.conn.peer_addr2: self.conn.proto1} + assert self.manager1.connections._connections.ready_peers() == {} + self._check_result_only_cmd(self.conn.peek_tr1_value(), b'HELLO') + self._check_result_only_cmd(self.conn.peek_tr2_value(), b'HELLO') + + self.conn.run_one_step() # HELLO + self.manager1.connections.connect_to(entrypoint) + assert self.manager1.connections._connections.connecting_outbound_peers() == set() + assert self.manager1.connections._connections.handshaking_peers() == {self.conn.peer_addr2: self.conn.proto1} + assert self.manager1.connections._connections.ready_peers() == {} + self._check_result_only_cmd(self.conn.peek_tr1_value(), b'PEER-ID') + self._check_result_only_cmd(self.conn.peek_tr2_value(), b'PEER-ID') + + self.conn.run_one_step() # PEER-ID + self.manager1.connections.connect_to(entrypoint) + assert self.manager1.connections._connections.connecting_outbound_peers() == set() + assert self.manager1.connections._connections.handshaking_peers() == {self.conn.peer_addr2: self.conn.proto1} + assert self.manager1.connections._connections.ready_peers() == {} + self._check_result_only_cmd(self.conn.peek_tr1_value(), b'READY') + self._check_result_only_cmd(self.conn.peek_tr2_value(), b'READY') + + self.conn.run_one_step() # READY + self.manager1.connections.connect_to(entrypoint) + assert self.manager1.connections._connections.connecting_outbound_peers() == set() + assert self.manager1.connections._connections.handshaking_peers() == {} + assert self.manager1.connections._connections.ready_peers() == {self.conn.peer_addr2: self.conn.proto1} + def test_invalid_same_peer_id(self) -> None: manager3 = self.create_peer(self.network, peer=self.peer1) conn = FakeConnection(self.manager1, manager3) @@ -333,21 +375,24 @@ def test_invalid_same_peer_id2(self) -> None: # one of the peers will close the connection. We don't know which one, as it depends # on the peer ids - if self.conn.tr1.disconnecting or self.conn.tr2.disconnecting: - conn_dead = self.conn + if bytes(self.peer1.id) > bytes(self.peer2.id): + tr_dead = self.conn.tr1 + tr_dead_value = self.conn.peek_tr1_value() + proto_alive = conn.proto2 conn_alive = conn - elif conn.tr1.disconnecting or conn.tr2.disconnecting: - conn_dead = conn - conn_alive = self.conn else: - raise Exception('It should never happen.') - self._check_result_only_cmd(conn_dead.peek_tr1_value() + conn_dead.peek_tr2_value(), b'ERROR') + tr_dead = conn.tr2 + tr_dead_value = conn.peek_tr2_value() + proto_alive = self.conn.proto1 + conn_alive = self.conn + + self._check_result_only_cmd(tr_dead_value, b'ERROR') # at this point, the connection must be closing as the error was detected on READY state - self.assertIn(True, [conn_dead.tr1.disconnecting, conn_dead.tr2.disconnecting]) - # check connected_peers - connected_peers = list(self.manager1.connections.connected_peers.values()) - self.assertEquals(1, len(connected_peers)) - self.assertIn(connected_peers[0], [conn_alive.proto1, conn_alive.proto2]) + self.assertTrue(tr_dead.disconnecting) + # check ready_peers + ready_peers = list(self.manager1.connections.iter_ready_connections()) + self.assertEquals(1, len(ready_peers)) + self.assertEquals(ready_peers[0], proto_alive) # connection is still up self.assertIsConnected(conn_alive) @@ -427,32 +472,32 @@ def test_send_invalid_unicode(self) -> None: self.assertTrue(self.conn.tr1.disconnecting) def test_on_disconnect(self) -> None: - self.assertIn(self.conn.proto1, self.manager1.connections.handshaking_peers) + self.assertIn(self.conn.proto1, self.manager1.connections.iter_handshaking_peers()) self.conn.disconnect(Failure(Exception('testing'))) - self.assertNotIn(self.conn.proto1, self.manager1.connections.handshaking_peers) + self.assertNotIn(self.conn.proto1, self.manager1.connections.iter_handshaking_peers()) def test_on_disconnect_after_hello(self) -> None: self.conn.run_one_step() # HELLO - self.assertIn(self.conn.proto1, self.manager1.connections.handshaking_peers) + self.assertIn(self.conn.proto1, self.manager1.connections.iter_handshaking_peers()) self.conn.disconnect(Failure(Exception('testing'))) - self.assertNotIn(self.conn.proto1, self.manager1.connections.handshaking_peers) + self.assertNotIn(self.conn.proto1, self.manager1.connections.iter_handshaking_peers()) def test_on_disconnect_after_peer(self) -> None: self.conn.run_one_step() # HELLO - self.assertIn(self.conn.proto1, self.manager1.connections.handshaking_peers) + self.assertIn(self.conn.proto1, self.manager1.connections.iter_handshaking_peers()) # No peer id in the peer_storage (known_peers) self.assertNotIn(self.peer2.id, self.manager1.connections.verified_peer_storage) # The peer READY now depends on a message exchange from both peers, so we need one more step self.conn.run_one_step() # PEER-ID self.conn.run_one_step() # READY - self.assertIn(self.conn.proto1, self.manager1.connections.connected_peers.values()) + self.assertIn(self.conn.proto1, self.manager1.connections.iter_ready_connections()) # Peer id 2 in the peer_storage (known_peers) after connection self.assertIn(self.peer2.id, self.manager1.connections.verified_peer_storage) - self.assertNotIn(self.conn.proto1, self.manager1.connections.handshaking_peers) + self.assertNotIn(self.conn.proto1, self.manager1.connections.iter_handshaking_peers()) self.conn.disconnect(Failure(Exception('testing'))) # Peer id 2 in the peer_storage (known_peers) after disconnection but before looping call self.assertIn(self.peer2.id, self.manager1.connections.verified_peer_storage) - self.assertNotIn(self.conn.proto1, self.manager1.connections.connected_peers.values()) + self.assertNotIn(self.conn.proto1, self.manager1.connections.iter_ready_connections()) self.clock.advance(10) # Peer id 2 removed from peer_storage (known_peers) after disconnection and after looping call @@ -468,9 +513,9 @@ def test_invalid_expected_peer_id(self) -> None: p2p_manager: ConnectionsManager = self.manager2.connections # Initially, manager1 and manager2 are handshaking, from the setup - assert p2p_manager.connecting_peers == {} - assert p2p_manager.handshaking_peers == {self.conn.proto2} - assert p2p_manager.connected_peers == {} + assert p2p_manager._connections.connecting_outbound_peers() == set() + assert p2p_manager._connections.handshaking_peers() == {self.conn.peer_addr1: self.conn.proto2} + assert p2p_manager._connections.ready_peers() == {} # We change our peer id (on manager1) new_peer = PrivatePeer.auto_generated() @@ -488,9 +533,9 @@ def test_invalid_expected_peer_id_bootstrap(self) -> None: p2p_manager: ConnectionsManager = self.manager1.connections # Initially, manager1 and manager2 are handshaking, from the setup - assert p2p_manager.connecting_peers == {} - assert p2p_manager.handshaking_peers == {self.conn.proto1} - assert p2p_manager.connected_peers == {} + assert p2p_manager._connections.connecting_outbound_peers() == set() + assert p2p_manager._connections.handshaking_peers() == {self.conn.peer_addr2: self.conn.proto1} + assert p2p_manager._connections.ready_peers() == {} # We create a new manager3, and use it as a bootstrap in manager1 peer3 = PrivatePeer.auto_generated() @@ -498,9 +543,12 @@ def test_invalid_expected_peer_id_bootstrap(self) -> None: conn = FakeConnection(manager1=manager3, manager2=self.manager1, fake_bootstrap_id=peer3.id) # Now manager1 and manager3 are handshaking - assert p2p_manager.connecting_peers == {} - assert p2p_manager.handshaking_peers == {self.conn.proto1, conn.proto2} - assert p2p_manager.connected_peers == {} + assert p2p_manager._connections.connecting_outbound_peers() == set() + assert p2p_manager._connections.handshaking_peers() == { + self.conn.peer_addr2: self.conn.proto1, + conn.peer_addr1: conn.proto2, + } + assert p2p_manager._connections.ready_peers() == {} # We change our peer id (on manager3) new_peer = PrivatePeer.auto_generated() @@ -518,18 +566,21 @@ def test_valid_unset_peer_id_bootstrap(self) -> None: p2p_manager: ConnectionsManager = self.manager1.connections # Initially, manager1 and manager2 are handshaking, from the setup - assert p2p_manager.connecting_peers == {} - assert p2p_manager.handshaking_peers == {self.conn.proto1} - assert p2p_manager.connected_peers == {} + assert p2p_manager._connections.connecting_outbound_peers() == set() + assert p2p_manager._connections.handshaking_peers() == {self.conn.peer_addr2: self.conn.proto1} + assert p2p_manager._connections.ready_peers() == {} # We create a new manager3, and use it as a bootstrap in manager1, but without the peer_id manager3: HathorManager = self.create_peer(self.network) conn = FakeConnection(manager1=manager3, manager2=self.manager1, fake_bootstrap_id=None) # Now manager1 and manager3 are handshaking - assert p2p_manager.connecting_peers == {} - assert p2p_manager.handshaking_peers == {self.conn.proto1, conn.proto2} - assert p2p_manager.connected_peers == {} + assert p2p_manager._connections.connecting_outbound_peers() == set() + assert p2p_manager._connections.handshaking_peers() == { + self.conn.peer_addr2: self.conn.proto1, + conn.peer_addr1: conn.proto2, + } + assert p2p_manager._connections.ready_peers() == {} # We change our peer id (on manager3) new_peer = PrivatePeer.auto_generated() diff --git a/tests/p2p/test_sync_rate_limiter.py b/tests/p2p/test_sync_rate_limiter.py index 04d091c27..e550b7da4 100644 --- a/tests/p2p/test_sync_rate_limiter.py +++ b/tests/p2p/test_sync_rate_limiter.py @@ -31,7 +31,7 @@ def test_sync_rate_limiter(self) -> None: manager2.connections.disable_rate_limiter() manager2.connections.enable_rate_limiter(8, 2) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] assert isinstance(protocol1.state, ReadyState) @@ -64,7 +64,7 @@ def test_sync_rate_limiter_disconnect(self) -> None: connections.rate_limiter.reset(connections.GlobalRateLimiter.SEND_TIPS) connections.enable_rate_limiter(1, 1) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] @@ -114,7 +114,7 @@ def test_sync_rate_limiter_delayed_calls_draining(self) -> None: connections.rate_limiter.reset(connections.GlobalRateLimiter.SEND_TIPS) connections.enable_rate_limiter(1, 1) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] @@ -154,7 +154,7 @@ def test_sync_rate_limiter_delayed_calls_stop(self) -> None: connections.rate_limiter.reset(connections.GlobalRateLimiter.SEND_TIPS) connections.enable_rate_limiter(1, 1) - connected_peers2 = list(manager2.connections.connected_peers.values()) + connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] diff --git a/tests/resources/p2p/test_status.py b/tests/resources/p2p/test_status.py index 646ba6903..b73634bd4 100644 --- a/tests/resources/p2p/test_status.py +++ b/tests/resources/p2p/test_status.py @@ -1,4 +1,3 @@ -from twisted.internet import endpoints from twisted.internet.address import IPv4Address from twisted.internet.defer import inlineCallbacks @@ -68,18 +67,18 @@ def test_handshaking(self): self.assertEqual(server_data['network'], 'testnet') self.assertGreater(server_data['uptime'], 0) - handshake_peer = self.conn1.proto1.transport.getPeer() - handshake_address = '{}:{}'.format(handshake_peer.host, handshake_peer.port) - self.assertEqual(len(known_peers), 0) self.assertEqual(len(connections['connected_peers']), 0) self.assertEqual(len(connections['handshaking_peers']), 1) - self.assertEqual(connections['handshaking_peers'][0]['address'], handshake_address) + self.assertEqual(connections['handshaking_peers'][0]['address'], str(self.conn1.proto1.addr)) @inlineCallbacks def test_get_with_one_peer(self): + assert self.conn1.peek_tr1_value().startswith(b'HELLO') self.conn1.run_one_step() # HELLO + assert self.conn1.peek_tr1_value().startswith(b'PEER-ID') self.conn1.run_one_step() # PEER-ID + assert self.conn1.peek_tr1_value().startswith(b'READY') self.conn1.run_one_step() # READY self.conn1.run_one_step() # BOTH PEERS ARE READY NOW @@ -100,17 +99,14 @@ def test_get_with_one_peer(self): @inlineCallbacks def test_connecting_peers(self): - address = '192.168.1.1:54321' - endpoint = endpoints.clientFromString(self.manager.reactor, 'tcp:{}'.format(address)) - deferred = endpoint.connect - self.manager.connections.connecting_peers[endpoint] = deferred + peer_address = PeerAddress.parse('tcp://192.168.1.1:54321') + self.manager.connections._connections._connecting_outbound.add(peer_address) response = yield self.web.get("status") data = response.json_value() connecting = data['connections']['connecting_peers'] self.assertEqual(len(connecting), 1) - self.assertEqual(connecting[0]['address'], address) - self.assertIsNotNone(connecting[0]['deferred']) + self.assertEqual(connecting[0]['address'], str(peer_address)) class SyncV1StatusTest(unittest.SyncV1Params, BaseStatusTest): diff --git a/tests/sysctl/test_p2p.py b/tests/sysctl/test_p2p.py index ec0366888..a3703676a 100644 --- a/tests/sysctl/test_p2p.py +++ b/tests/sysctl/test_p2p.py @@ -2,6 +2,7 @@ import tempfile from unittest.mock import MagicMock +from hathor.p2p.peer_endpoint import PeerAddress from hathor.p2p.peer_id import PeerId from hathor.sysctl import ConnectionsManagerSysctl from hathor.sysctl.exception import SysctlException @@ -172,7 +173,9 @@ def test_kill_one_connection(self): peer_id = '0e2bd0d8cd1fb6d040801c32ec27e8986ce85eb8810b6c878dcad15bce3b5b1e' conn = MagicMock() - p2p_manager.connected_peers[PeerId(peer_id)] = conn + conn.addr = PeerAddress.parse('tcp://localhost:40403') + p2p_manager._connections.on_connected(protocol=conn) + p2p_manager._connections.on_ready(addr=conn.addr, peer_id=PeerId(peer_id)) self.assertEqual(conn.disconnect.call_count, 0) sysctl.unsafe_set('kill_connection', peer_id) self.assertEqual(conn.disconnect.call_count, 1)