diff --git a/extras/update_checkpoints.py b/extras/update_checkpoints.py new file mode 100755 index 000000000..e83538b5c --- /dev/null +++ b/extras/update_checkpoints.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +""" +Usage: update_checkpoints.py [-h] [-n NETWORK] + +Helper script to update the config checkpoint list. + +options: + -h, --help show this help message and exit + -n NETWORK, --network NETWORK + The network to update (default: mainnet) + +For example: + +$ ./extras/update_checkpoints.py +New checkpoints to add for mainnet: + + 4_800_000: 00000000000000000716b8d9e96591ba7cb2d02c3d2d1d98d514f41c240fdff7 + 4_900_000: 0000000000000000079b1c1ebf48d351a7d31dcc55c5b4cf79ade79089a20f5a + 5_000_000: 000000000000000006c9167db1cc7e93fcf1c3014da6c6221390d03d1640c9b3 + + cp(4_800_000, bytes.fromhex('00000000000000000716b8d9e96591ba7cb2d02c3d2d1d98d514f41c240fdff7')), + cp(4_900_000, bytes.fromhex('0000000000000000079b1c1ebf48d351a7d31dcc55c5b4cf79ade79089a20f5a')), + cp(5_000_000, bytes.fromhex('000000000000000006c9167db1cc7e93fcf1c3014da6c6221390d03d1640c9b3')), + +The output can then be copied and pasted into `hathor/conf/mainnet.yml` and `hathor/conf/mainnet.py` +""" + +import requests +import yaml +import argparse + +# Built-in network configurations +NETWORKS: dict[str, dict[str, str]] = { + 'mainnet': { + 'config_file': 'hathor/conf/mainnet.yml', + 'node_url': 'https://node1.mainnet.hathor.network/v1a', + }, + 'testnet': { + 'config_file': 'hathor/conf/testnet.yml', + 'node_url': 'https://node1.golf.testnet.hathor.network/v1a', + }, + # Add more networks as needed +} + +CHECKPOINT_INTERVAL: int = 100_000 + + +def get_latest_height(node_url: str) -> int: + """Fetch the latest block height.""" + response = requests.get(f'{node_url}/transaction?type=block&count=1') + response.raise_for_status() + return response.json()['transactions'][0]['height'] + + +def get_hash_for_height(node_url: str, height: int) -> str: + """Fetch the hash for a given block height.""" + response = requests.get(f'{node_url}/block_at_height?height={height}') + response.raise_for_status() + return response.json()['block']['tx_id'] + + +def load_checkpoints(config_file: str) -> dict[str, int]: + """Load the checkpoints from the specified YAML config file.""" + with open(config_file, 'r') as file: + data = yaml.safe_load(file) + return data.get('CHECKPOINTS', {}) + + +def print_new_checkpoints(network_name: str) -> None: + """Print new checkpoints for the specified network.""" + if network_name not in NETWORKS: + print(f'Error: Unknown network {network_name}. Available networks: {", ".join(NETWORKS.keys())}') + return + + # Get the network configuration + network_config = NETWORKS[network_name] + config_file = network_config['config_file'] + node_url = network_config['node_url'] + + # Load existing checkpoints from the YAML file + current_checkpoints = load_checkpoints(config_file) + + # Get the latest block height + latest_height = get_latest_height(node_url) + + # Determine missing checkpoints + new_checkpoints = {} + for height in range(CHECKPOINT_INTERVAL, latest_height + 1, CHECKPOINT_INTERVAL): + if height not in current_checkpoints: + block_hash = get_hash_for_height(node_url, height) + new_checkpoints[height] = block_hash + + # Print new checkpoints + if new_checkpoints: + print(f'New checkpoints to add for {network_name}:\n') + for height, block_hash in sorted(new_checkpoints.items()): + print(f' {height:_}: {block_hash}') + print() + for height, block_hash in sorted(new_checkpoints.items()): + print(f''' cp({height:_}, bytes.fromhex('{block_hash}')),''') + else: + print(f'No new checkpoints needed for {network_name}. All up to date.') + + +if __name__ == '__main__': + # Parse command-line arguments + parser = argparse.ArgumentParser(description='Helper script to update the config checkpoint list.') + parser.add_argument('-n', '--network', default='mainnet', help='The network to update (default: mainnet)') + args = parser.parse_args() + + # Print new checkpoints for the specified network + print_new_checkpoints(args.network) diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index ea3afd8f7..b2d7e2890 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -202,6 +202,9 @@ def __init__(self) -> None: self._poa_signer: PoaSigner | None = None self._poa_block_producer: PoaBlockProducer | None = None + self._enable_ipv6: bool = False + self._disable_ipv4: bool = False + def build(self) -> BuildArtifacts: if self.artifacts is not None: raise ValueError('cannot call build twice') @@ -426,6 +429,8 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager: ssl=enable_ssl, whitelist_only=False, rng=self._rng, + enable_ipv6=self._enable_ipv6, + disable_ipv4=self._disable_ipv4, ) SyncSupportLevel.add_factories( self._get_or_create_settings(), @@ -812,6 +817,16 @@ def disable_full_verification(self) -> 'Builder': self._full_verification = False return self + def enable_ipv6(self) -> 'Builder': + self.check_if_can_modify() + self._enable_ipv6 = True + return self + + def disable_ipv4(self) -> 'Builder': + self.check_if_can_modify() + self._disable_ipv4 = True + return self + def set_soft_voided_tx_ids(self, soft_voided_tx_ids: set[bytes]) -> 'Builder': self.check_if_can_modify() self._soft_voided_tx_ids = soft_voided_tx_ids diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 464d9b319..059007190 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -242,6 +242,9 @@ def create_manager(self, reactor: Reactor) -> HathorManager: pubsub = PubSubManager(reactor) if self._args.x_enable_event_queue: + self.log.warn('--x-enable-event-queue is deprecated and will be removed, use --enable-event-queue instead') + + if self._args.x_enable_event_queue or self._args.enable_event_queue: self.event_ws_factory = EventWebsocketFactory( peer_id=str(peer.id), settings=settings, @@ -270,8 +273,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager: full_verification = False if self._args.x_full_verification: self.check_or_raise( - not self._args.x_enable_event_queue, - '--x-full-verification cannot be used with --x-enable-event-queue' + not self._args.x_enable_event_queue and not self._args.enable_event_queue, + '--x-full-verification cannot be used with --enable-event-queue' ) full_verification = True @@ -282,8 +285,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager: execution_manager=execution_manager ) - if self._args.x_enable_event_queue: - self.log.info('--x-enable-event-queue flag provided. ' + if self._args.x_enable_event_queue or self._args.enable_event_queue: + self.log.info('--enable-event-queue flag provided. ' 'The events detected by the full node will be stored and can be retrieved by clients') self.feature_service = FeatureService(settings=settings, tx_storage=tx_storage) @@ -326,6 +329,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager: ssl=True, whitelist_only=False, rng=Random(), + enable_ipv6=self._args.x_enable_ipv6, + disable_ipv4=self._args.x_disable_ipv4, ) vertex_handler = VertexHandler( @@ -376,7 +381,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: checkpoints=settings.CHECKPOINTS, environment_info=get_environment_info(args=str(self._args), peer_id=str(peer.id)), full_verification=full_verification, - enable_event_queue=self._args.x_enable_event_queue, + enable_event_queue=self._args.x_enable_event_queue or self._args.enable_event_queue, bit_signaling_service=bit_signaling_service, verification_service=verification_service, cpu_mining_service=cpu_mining_service, diff --git a/hathor/builder/resources_builder.py b/hathor/builder/resources_builder.py index f067cc3d9..bfc47e7a7 100644 --- a/hathor/builder/resources_builder.py +++ b/hathor/builder/resources_builder.py @@ -307,7 +307,7 @@ def create_resources(self) -> server.Site: ws_factory.subscribe(self.manager.pubsub) # Event websocket resource - if self._args.x_enable_event_queue: + if self._args.x_enable_event_queue or self._args.enable_event_queue: root.putChild(b'event_ws', WebSocketResource(self.event_ws_factory)) root.putChild(b'event', EventResource(self.manager._event_manager)) diff --git a/hathor/cli/events_simulator/events_simulator.py b/hathor/cli/events_simulator/events_simulator.py index 897c57bf3..23fe64418 100644 --- a/hathor/cli/events_simulator/events_simulator.py +++ b/hathor/cli/events_simulator/events_simulator.py @@ -88,6 +88,8 @@ def execute(args: Namespace, reactor: 'ReactorProtocol') -> None: forwarding_ws_factory.start(stream_id='simulator_stream_id') scenario.simulate(simulator, manager) + assert manager.wallet is not None + log.info('final result', balances=manager.wallet.get_balance_per_address(simulator.settings.HATHOR_TOKEN_UID)) reactor.listenTCP(args.port, site) reactor.run() diff --git a/hathor/cli/nginx_config.py b/hathor/cli/nginx_config.py index 5c6f2a874..9f8684f0a 100644 --- a/hathor/cli/nginx_config.py +++ b/hathor/cli/nginx_config.py @@ -240,11 +240,12 @@ def generate_nginx_config(openapi: dict[str, Any], *, out_file: TextIO, rate_k: server_open = f''' upstream backend {{ - server fullnode:8080; + server 127.0.0.1:8080; }} server {{ listen 80; + listen [::]:80; server_name localhost; # Look for client IP in the X-Forwarded-For header diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 9498194ab..ba4f40816 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -55,7 +55,6 @@ class RunNode: ('--x-sync-bridge', lambda args: bool(args.x_sync_bridge)), ('--x-sync-v1-only', lambda args: bool(args.x_sync_v1_only)), ('--x-sync-v2-only', lambda args: bool(args.x_sync_v2_only)), - ('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)), ('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor)), ('--x-ipython-kernel', lambda args: bool(args.x_ipython_kernel)), ] @@ -93,7 +92,9 @@ def create_parser(cls) -> ArgumentParser: help='Address to listen for new connections (eg: tcp:8000)') parser.add_argument('--bootstrap', action='append', help='Address to connect to (eg: tcp:127.0.0.1:8000') parser.add_argument('--status', type=int, help='Port to run status server') + parser.add_argument('--x-status-ipv6-interface', help='IPv6 interface to bind the status server') parser.add_argument('--stratum', type=int, help='Port to run stratum server') + parser.add_argument('--x-stratum-ipv6-interface', help='IPv6 interface to bind the stratum server') parser.add_argument('--data', help='Data directory') storage = parser.add_mutually_exclusive_group() storage.add_argument('--rocksdb-storage', action='store_true', help='Use RocksDB storage backend (default)') @@ -144,7 +145,9 @@ def create_parser(cls) -> ArgumentParser: sync_args.add_argument('--x-sync-bridge', action='store_true', help='Enable running both sync protocols.') parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost') parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS) - parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism') + parser.add_argument('--x-enable-event-queue', action='store_true', + help='Deprecated: use --enable-event-queue instead.') + parser.add_argument('--enable-event-queue', action='store_true', help='Enable event queue mechanism') parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str, help='Peer IDs to forbid connection') parser.add_argument('--config-yaml', type=str, help='Configuration yaml filepath') @@ -162,6 +165,10 @@ def create_parser(cls) -> ArgumentParser: help='Log tx bytes for debugging') parser.add_argument('--disable-ws-history-streaming', action='store_true', help='Disable websocket history streaming API') + parser.add_argument('--x-enable-ipv6', action='store_true', + help='Enables listening on IPv6 interface and connecting to IPv6 peers') + parser.add_argument('--x-disable-ipv4', action='store_true', + help='Disables connecting to IPv4 peers') return parser def prepare(self, *, register_resources: bool = True) -> None: @@ -181,6 +188,7 @@ def prepare(self, *, register_resources: bool = True) -> None: print('Maximum number of open file descriptors is too low. Minimum required is 256.') sys.exit(-2) + self.validate_args() self.check_unsafe_arguments() self.check_python_version() @@ -202,7 +210,15 @@ def prepare(self, *, register_resources: bool = True) -> None: if self._args.stratum: assert self.manager.stratum_factory is not None - self.reactor.listenTCP(self._args.stratum, self.manager.stratum_factory) + + if self._args.x_enable_ipv6: + interface = self._args.x_stratum_ipv6_interface or '::0' + # Linux by default will map IPv4 to IPv6, so listening only in the IPv6 interface will be + # enough to handle IPv4 connections. There is a kernel parameter that controls this behavior: + # https://sysctl-explorer.net/net/ipv6/bindv6only/ + self.reactor.listenTCP(self._args.stratum, self.manager.stratum_factory, interface=interface) + else: + self.reactor.listenTCP(self._args.stratum, self.manager.stratum_factory) from hathor.conf.get_settings import get_global_settings settings = get_global_settings() @@ -217,7 +233,12 @@ def prepare(self, *, register_resources: bool = True) -> None: status_server = resources_builder.build() if self._args.status: assert status_server is not None - self.reactor.listenTCP(self._args.status, status_server) + + if self._args.x_enable_ipv6: + interface = self._args.x_status_ipv6_interface or '::0' + self.reactor.listenTCP(self._args.status, status_server, interface=interface) + else: + self.reactor.listenTCP(self._args.status, status_server) self.start_manager() @@ -351,6 +372,11 @@ def run_sysctl_from_signal(self) -> None: except SysctlRunnerException as e: self.log.warn('[USR2] Error', errmsg=str(e)) + def validate_args(self) -> None: + if self._args.x_disable_ipv4 and not self._args.x_enable_ipv6: + self.log.critical('You must enable IPv6 if you disable IPv4.') + sys.exit(-1) + def check_unsafe_arguments(self) -> None: unsafe_args_found = [] for arg_cmdline, arg_test_fn in self.UNSAFE_ARGUMENTS: diff --git a/hathor/cli/run_node_args.py b/hathor/cli/run_node_args.py index f493a7d33..6f076253f 100644 --- a/hathor/cli/run_node_args.py +++ b/hathor/cli/run_node_args.py @@ -36,7 +36,9 @@ class RunNodeArgs(BaseModel, extra=Extra.allow): listen: list[str] bootstrap: Optional[list[str]] status: Optional[int] + x_status_ipv6_interface: Optional[str] stratum: Optional[int] + x_stratum_ipv6_interface: Optional[str] data: Optional[str] rocksdb_storage: bool memory_storage: bool @@ -74,6 +76,7 @@ class RunNodeArgs(BaseModel, extra=Extra.allow): x_localhost_only: bool x_rocksdb_indexes: bool x_enable_event_queue: bool + enable_event_queue: bool peer_id_blacklist: list[str] config_yaml: Optional[str] signal_support: set[Feature] @@ -83,3 +86,5 @@ class RunNodeArgs(BaseModel, extra=Extra.allow): nano_testnet: bool log_vertex_bytes: bool disable_ws_history_streaming: bool + x_enable_ipv6: bool + x_disable_ipv4: bool diff --git a/hathor/conf/mainnet.py b/hathor/conf/mainnet.py index f580f9481..c5614e34b 100644 --- a/hathor/conf/mainnet.py +++ b/hathor/conf/mainnet.py @@ -72,6 +72,20 @@ cp(3_400_000, bytes.fromhex('000000000000000077242c961a0c6f708bc671a8372eb8b095311f091fddc6c3')), cp(3_500_000, bytes.fromhex('000000000000000a34ba20552c3cae9549b9c5ca07f644cf005328c948aa54d8')), cp(3_600_000, bytes.fromhex('000000000000000011031d9ff030cd9e6fe8a3766bbeda6f6337c40dd30fa65f')), + cp(3_700_000, bytes.fromhex('0000000000000006c6e7295efcf0929173cc47ece41afc652410b72f36cbeeda')), + cp(3_800_000, bytes.fromhex('00000000000000122f57d59c7f6736a83483dcf71c34978102d7e04ce4dc9a5d')), + cp(3_900_000, bytes.fromhex('00000000000000069edf3300d6c41451485d7aabdbea34425a2411b880e8a976')), + cp(4_000_000, bytes.fromhex('00000000000000043b11a6c86c3cdaf773a5183737f136e196e816f862e1e3ba')), + cp(4_100_000, bytes.fromhex('0000000000000020822d529b6fcd8611f5a174b1f44a6c478a2fec64a80233ad')), + cp(4_200_000, bytes.fromhex('00000000000000052ffc34875fab4e545bc9dc76f1212c4fdafab3b6d7a026cd')), + cp(4_300_000, bytes.fromhex('000000000000000e1ea2af0e25087c0977e944dd0ffdae5fdff54dda85ed95be')), + cp(4_400_000, bytes.fromhex('0000000000000000020dab883c57e21829b590ef61ff5230f5fdc9d572300945')), + cp(4_500_000, bytes.fromhex('00000000000000034d5ddf802a8ac8fbf17cf50747041e433d28d9f2bcb6ef02')), + cp(4_600_000, bytes.fromhex('000000000000000055bb4e5b6d942da13cb631f318cfdc292793f28ef8a338ca')), + cp(4_700_000, bytes.fromhex('000000000000000002ae1d75811b1050fc98ee7ef30c48cde117ebbb42f47e22')), + cp(4_800_000, bytes.fromhex('00000000000000000716b8d9e96591ba7cb2d02c3d2d1d98d514f41c240fdff7')), + cp(4_900_000, bytes.fromhex('0000000000000000079b1c1ebf48d351a7d31dcc55c5b4cf79ade79089a20f5a')), + cp(5_000_000, bytes.fromhex('000000000000000006c9167db1cc7e93fcf1c3014da6c6221390d03d1640c9b3')), ], SOFT_VOIDED_TX_IDS=list(map(bytes.fromhex, [ '0000000012a922a6887497bed9c41e5ed7dc7213cae107db295602168266cd02', diff --git a/hathor/conf/mainnet.yml b/hathor/conf/mainnet.yml index d32845449..d05a07e08 100644 --- a/hathor/conf/mainnet.yml +++ b/hathor/conf/mainnet.yml @@ -53,6 +53,20 @@ CHECKPOINTS: 3_400_000: 000000000000000077242c961a0c6f708bc671a8372eb8b095311f091fddc6c3 3_500_000: 000000000000000a34ba20552c3cae9549b9c5ca07f644cf005328c948aa54d8 3_600_000: 000000000000000011031d9ff030cd9e6fe8a3766bbeda6f6337c40dd30fa65f + 3_700_000: 0000000000000006c6e7295efcf0929173cc47ece41afc652410b72f36cbeeda + 3_800_000: 00000000000000122f57d59c7f6736a83483dcf71c34978102d7e04ce4dc9a5d + 3_900_000: 00000000000000069edf3300d6c41451485d7aabdbea34425a2411b880e8a976 + 4_000_000: 00000000000000043b11a6c86c3cdaf773a5183737f136e196e816f862e1e3ba + 4_100_000: 0000000000000020822d529b6fcd8611f5a174b1f44a6c478a2fec64a80233ad + 4_200_000: 00000000000000052ffc34875fab4e545bc9dc76f1212c4fdafab3b6d7a026cd + 4_300_000: 000000000000000e1ea2af0e25087c0977e944dd0ffdae5fdff54dda85ed95be + 4_400_000: 0000000000000000020dab883c57e21829b590ef61ff5230f5fdc9d572300945 + 4_500_000: 00000000000000034d5ddf802a8ac8fbf17cf50747041e433d28d9f2bcb6ef02 + 4_600_000: 000000000000000055bb4e5b6d942da13cb631f318cfdc292793f28ef8a338ca + 4_700_000: 000000000000000002ae1d75811b1050fc98ee7ef30c48cde117ebbb42f47e22 + 4_800_000: 00000000000000000716b8d9e96591ba7cb2d02c3d2d1d98d514f41c240fdff7 + 4_900_000: 0000000000000000079b1c1ebf48d351a7d31dcc55c5b4cf79ade79089a20f5a + 5_000_000: 000000000000000006c9167db1cc7e93fcf1c3014da6c6221390d03d1640c9b3 SOFT_VOIDED_TX_IDS: - 0000000012a922a6887497bed9c41e5ed7dc7213cae107db295602168266cd02 diff --git a/hathor/conf/settings.py b/hathor/conf/settings.py index db235f2b7..9fd45bc3f 100644 --- a/hathor/conf/settings.py +++ b/hathor/conf/settings.py @@ -288,6 +288,9 @@ def GENESIS_TX2_TIMESTAMP(self) -> int: # Maximum period without receiving any messages from ther peer (in seconds). PEER_IDLE_TIMEOUT: int = 60 + # Maximum number of entrypoints that we accept that a peer broadcasts + PEER_MAX_ENTRYPOINTS: int = 30 + # Filepath of ca certificate file to generate connection certificates CA_FILEPATH: str = os.path.join(os.path.dirname(__file__), '../p2p/ca.crt') @@ -364,6 +367,7 @@ def GENESIS_TX2_TIMESTAMP(self) -> int: CAPABILITY_WHITELIST: str = 'whitelist' CAPABILITY_SYNC_VERSION: str = 'sync-version' CAPABILITY_GET_BEST_BLOCKCHAIN: str = 'get-best-blockchain' + CAPABILITY_IPV6: str = 'ipv6' # peers announcing this capability will be relayed ipv6 entrypoints from other peers # Where to download whitelist from WHITELIST_URL: Optional[str] = None @@ -430,6 +434,12 @@ def GENESIS_TX2_TIMESTAMP(self) -> int: # more than enough for the forseeable future MAX_MEMPOOL_RECEIVING_TIPS: int = 1000 + # Max number of peers simultanously stored in the node + MAX_VERIFIED_PEERS: int = 10_000 + + # Max number of peers simultanously stored per-connection + MAX_UNVERIFIED_PEERS_PER_CONN: int = 100 + # Used to enable nano contracts. # # This should NEVER be enabled for mainnet and testnet, since both networks will diff --git a/hathor/conf/testnet.py b/hathor/conf/testnet.py index 4743e9f4e..f2d322489 100644 --- a/hathor/conf/testnet.py +++ b/hathor/conf/testnet.py @@ -53,6 +53,33 @@ cp(1_400_000, bytes.fromhex('000000000df9cb786c68a643a52a67c22ab54e8b8e41cbe9b761133f6c8abbfe')), cp(1_500_000, bytes.fromhex('000000000c3591805f4748480b59ac1788f754fc004930985a487580e2b5de8f')), cp(1_600_000, bytes.fromhex('00000000060adfdfd7d488d4d510b5779cf35a3c50df7bcff941fbb6957be4d2')), + cp(1_700_000, bytes.fromhex('0000000007afc04aebad15b14fcd93c1b5193dc503b190433f55be8c218b6d12')), + cp(1_800_000, bytes.fromhex('00000000126f16af2ba934a60cf8f2da32d3ed2688c56ce8ff477e483a3ffc42')), + cp(1_900_000, bytes.fromhex('0000000005d2a2ba2231663187b460396189af0ffca7b2e93fccc85cde04cbdc')), + cp(2_000_000, bytes.fromhex('000000000009a8451ff2d5ec54951d717da2766aedb3131485466cc993879ee1')), + cp(2_100_000, bytes.fromhex('0000000009f961804cd7f43da05f08a94a2fa09f82c7d605afc5982ab242a7e4')), + cp(2_200_000, bytes.fromhex('0000000002e260b970846a89c23e754a763e7c5f1578b6ec4e67bdb94c667997')), + cp(2_300_000, bytes.fromhex('0000000006e0894c8f7fd029fe446a42433875647759183ba3fbb0ff0b7ceb64')), + cp(2_400_000, bytes.fromhex('0000000011ab28f3be17e3a098307fa73750cc8d74f1f60cfb44b524a60c94ec')), + cp(2_500_000, bytes.fromhex('00000000045d2bcc10c896bfc7d1f28788e3530a81f50ee096f386eec772634f')), + cp(2_600_000, bytes.fromhex('000000000766b9ac25e2ece5685effa834e61284e38f368c841210606bb1fdfc')), + cp(2_700_000, bytes.fromhex('0000000005d0ee31d0f47f6ff9aa570b9f25b9d44a8a59cea0e0f8a1729b9c90')), + cp(2_800_000, bytes.fromhex('000000000a5bd4f266fa13d2c0594cabf6465758f7f5814bde626032706b81e5')), + cp(2_900_000, bytes.fromhex('000000000b11b0a09ff0d7c2cfd9228f31c53008e700532e439d3a3d9c63fb8e')), + cp(3_000_000, bytes.fromhex('00000000013289569569cd51580183a2c870dfe5a395adaa00ae66fefe51af3d')), + cp(3_100_000, bytes.fromhex('00000000170c55e6ec207400bfc42786c1e0c32fe045a1d815f930daf2bf3020')), + cp(3_200_000, bytes.fromhex('00000000149986cb99c202136bd388fb2a7fcba4bdfd6ac049069ac5e08a587f')), + cp(3_300_000, bytes.fromhex('000000000e16f87ac7133639cb52a99574944b8457939396e7faf1615fcfdb0f')), + cp(3_400_000, bytes.fromhex('000000000f551f6224a459904436072f5ff10fd3db17f2d7e25b1ef9b149c121')), + cp(3_500_000, bytes.fromhex('0000000006572b8cf41130e88776adf8583e970905df2afe593ca31c91ab0c4c')), + cp(3_600_000, bytes.fromhex('000000000215fcc7018cc31bbfb943ca43c6297529fa008bf34665f3ac64d340')), + cp(3_700_000, bytes.fromhex('000000000dbf5e8ab4f90f2187db6db429c9d0cb8169051ce8a9e79b810509d7')), + cp(3_800_000, bytes.fromhex('00000000030411ec36c7f5386a94e147460d86592f85459e0eadd5cd0e3da7b4')), + cp(3_900_000, bytes.fromhex('000000000bc2c7078a3c59d878196f1491aad45a0df9d312909d85482ac8d714')), + cp(4_000_000, bytes.fromhex('000000000eba0dae3ec27cf5596ef49731744edebadb9fbae42160b6aa2e2461')), + cp(4_100_000, bytes.fromhex('00000000052aa77fd8db71d5306257f9fe068c3401d95b17fcedcccfc9b76c82')), + cp(4_200_000, bytes.fromhex('00000000010a8dae043c84fcb2cef6a2b42a28279b95af20ab5a098acf2a3565')), + cp(4_300_000, bytes.fromhex('000000000019da781ef75fa5f59c5537d8ed18b64c589c3e036109cfb1d84f7d')), ], FEATURE_ACTIVATION=FeatureActivationSettings( default_threshold=15_120, # 15120 = 75% of evaluation_interval (20160) @@ -73,11 +100,11 @@ # NOP feature to test Feature Activation for Transactions Feature.NOP_FEATURE_1: Criteria( bit=0, - # N = 4_354_560 - # Expected to be reached around Sunday, 2024-11-17. - # Right now the best block is 4_326_600 on testnet (2024-11-07). - start_height=4_354_560, # N - timeout_height=4_394_880, # N + 2 * 20160 (2 weeks after the start) + # N = 4_394_880 + # start_height expected to be reached around Sunday, 2024-12-01. + # Right now the best block is 4_377_375 on testnet (2024-11-25). + start_height=4_394_880, # N + timeout_height=4_475_520, # N + 4 * 20160 (4 weeks after the start) minimum_activation_height=0, lock_in_on_timeout=False, version='0.63.0', diff --git a/hathor/conf/testnet.yml b/hathor/conf/testnet.yml index 8babd1f32..f8dcf5290 100644 --- a/hathor/conf/testnet.yml +++ b/hathor/conf/testnet.yml @@ -35,6 +35,33 @@ CHECKPOINTS: 1_400_000: 000000000df9cb786c68a643a52a67c22ab54e8b8e41cbe9b761133f6c8abbfe 1_500_000: 000000000c3591805f4748480b59ac1788f754fc004930985a487580e2b5de8f 1_600_000: 00000000060adfdfd7d488d4d510b5779cf35a3c50df7bcff941fbb6957be4d2 + 1_700_000: 0000000007afc04aebad15b14fcd93c1b5193dc503b190433f55be8c218b6d12 + 1_800_000: 00000000126f16af2ba934a60cf8f2da32d3ed2688c56ce8ff477e483a3ffc42 + 1_900_000: 0000000005d2a2ba2231663187b460396189af0ffca7b2e93fccc85cde04cbdc + 2_000_000: 000000000009a8451ff2d5ec54951d717da2766aedb3131485466cc993879ee1 + 2_100_000: 0000000009f961804cd7f43da05f08a94a2fa09f82c7d605afc5982ab242a7e4 + 2_200_000: 0000000002e260b970846a89c23e754a763e7c5f1578b6ec4e67bdb94c667997 + 2_300_000: 0000000006e0894c8f7fd029fe446a42433875647759183ba3fbb0ff0b7ceb64 + 2_400_000: 0000000011ab28f3be17e3a098307fa73750cc8d74f1f60cfb44b524a60c94ec + 2_500_000: 00000000045d2bcc10c896bfc7d1f28788e3530a81f50ee096f386eec772634f + 2_600_000: 000000000766b9ac25e2ece5685effa834e61284e38f368c841210606bb1fdfc + 2_700_000: 0000000005d0ee31d0f47f6ff9aa570b9f25b9d44a8a59cea0e0f8a1729b9c90 + 2_800_000: 000000000a5bd4f266fa13d2c0594cabf6465758f7f5814bde626032706b81e5 + 2_900_000: 000000000b11b0a09ff0d7c2cfd9228f31c53008e700532e439d3a3d9c63fb8e + 3_000_000: 00000000013289569569cd51580183a2c870dfe5a395adaa00ae66fefe51af3d + 3_100_000: 00000000170c55e6ec207400bfc42786c1e0c32fe045a1d815f930daf2bf3020 + 3_200_000: 00000000149986cb99c202136bd388fb2a7fcba4bdfd6ac049069ac5e08a587f + 3_300_000: 000000000e16f87ac7133639cb52a99574944b8457939396e7faf1615fcfdb0f + 3_400_000: 000000000f551f6224a459904436072f5ff10fd3db17f2d7e25b1ef9b149c121 + 3_500_000: 0000000006572b8cf41130e88776adf8583e970905df2afe593ca31c91ab0c4c + 3_600_000: 000000000215fcc7018cc31bbfb943ca43c6297529fa008bf34665f3ac64d340 + 3_700_000: 000000000dbf5e8ab4f90f2187db6db429c9d0cb8169051ce8a9e79b810509d7 + 3_800_000: 00000000030411ec36c7f5386a94e147460d86592f85459e0eadd5cd0e3da7b4 + 3_900_000: 000000000bc2c7078a3c59d878196f1491aad45a0df9d312909d85482ac8d714 + 4_000_000: 000000000eba0dae3ec27cf5596ef49731744edebadb9fbae42160b6aa2e2461 + 4_100_000: 00000000052aa77fd8db71d5306257f9fe068c3401d95b17fcedcccfc9b76c82 + 4_200_000: 00000000010a8dae043c84fcb2cef6a2b42a28279b95af20ab5a098acf2a3565 + 4_300_000: 000000000019da781ef75fa5f59c5537d8ed18b64c589c3e036109cfb1d84f7d FEATURE_ACTIVATION: default_threshold: 15_120 # 15120 = 75% of evaluation_interval (20160) @@ -54,11 +81,11 @@ FEATURE_ACTIVATION: # NOP feature to test Feature Activation for Transactions NOP_FEATURE_1: bit: 0 - # N = 4_354_560 - # Expected to be reached around Sunday, 2024-11-17. - # Right now the best block is 4_326_600 on testnet (2024-11-07). - start_height: 4_354_560 # N - timeout_height: 4_394_880 # N + 2 * 20160 (2 weeks after the start) + # N = 4_394_880 + # start_height expected to be reached around Sunday, 2024-12-01. + # Right now the best block is 4_377_375 on testnet (2024-11-25). + start_height: 4_394_880 # N + timeout_height: 4_475_520 # N + 4 * 20160 (4 weeks after the start) minimum_activation_height: 0 lock_in_on_timeout: false version: 0.63.0 diff --git a/hathor/manager.py b/hathor/manager.py index cc86dd9dc..8751e5427 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -251,7 +251,8 @@ def get_default_capabilities(self) -> list[str]: return [ self._settings.CAPABILITY_WHITELIST, self._settings.CAPABILITY_SYNC_VERSION, - self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN + self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN, + self._settings.CAPABILITY_IPV6, ] def start(self) -> None: diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index d53c7be83..56371da69 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import deque from typing import TYPE_CHECKING, Any, Iterable, NamedTuple, Optional from structlog import get_logger @@ -30,7 +31,7 @@ from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint from hathor.p2p.peer_id import PeerId -from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage +from hathor.p2p.peer_storage import VerifiedPeerStorage from hathor.p2p.protocol import HathorProtocol from hathor.p2p.rate_limiter import RateLimiter from hathor.p2p.states.ready import ReadyState @@ -81,10 +82,10 @@ class GlobalRateLimiter: manager: Optional['HathorManager'] connections: set[HathorProtocol] connected_peers: dict[PeerId, HathorProtocol] + new_connection_from_queue: deque[PeerId] connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer] handshaking_peers: set[HathorProtocol] whitelist_only: bool - unverified_peer_storage: UnverifiedPeerStorage verified_peer_storage: VerifiedPeerStorage _sync_factories: dict[SyncVersion, SyncAgentFactory] _enabled_sync_versions: set[SyncVersion] @@ -100,6 +101,8 @@ def __init__( ssl: bool, rng: Random, whitelist_only: bool, + enable_ipv6: bool, + disable_ipv4: bool, ) -> None: self.log = logger.new() self._settings = settings @@ -154,12 +157,12 @@ def __init__( # List of peers connected and ready to communicate. self.connected_peers = {} - # List of peers received from the network. - # We cannot trust their identity before we connect to them. - self.unverified_peer_storage = UnverifiedPeerStorage() + # Queue of ready peer-id's used by connect_to_peer_from_connection_queue to choose the next peer to pull a + # random new connection from + self.new_connection_from_queue = deque() # List of known peers. - self.verified_peer_storage = VerifiedPeerStorage() # dict[string (peer.id), PublicPeer] + self.verified_peer_storage = VerifiedPeerStorage(rng=self.rng, max_size=self._settings.MAX_VERIFIED_PEERS) # Maximum unseen time before removing a peer (seconds). self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes @@ -179,6 +182,11 @@ def __init__( # Timestamp of the last time sync was updated. self._last_sync_rotate: float = 0. + # Connect to new peers in a timed loop, instead of as soon as possible + self.lc_connect = LoopingCall(self.connect_to_peer_from_connection_queue) + self.lc_connect.clock = self.reactor + self.lc_connect_interval = 0.2 # seconds + # A timer to try to reconnect to the disconnect known peers. if self._settings.ENABLE_PEER_WHITELIST: self.wl_reconnect = LoopingCall(self.update_whitelist) @@ -190,6 +198,12 @@ def __init__( # Parameter to explicitly enable whitelist-only mode, when False it will still check the whitelist for sync-v1 self.whitelist_only = whitelist_only + # Parameter to enable IPv6 connections + self.enable_ipv6 = enable_ipv6 + + # Parameter to disable IPv4 connections + self.disable_ipv4 = disable_ipv4 + # Timestamp when the last discovery ran self._last_discovery: float = 0. @@ -264,7 +278,7 @@ def do_discovery(self) -> None: Do a discovery and connect on all discovery strategies. """ for peer_discovery in self.peer_discoveries: - coro = peer_discovery.discover_and_connect(self.connect_to) + coro = peer_discovery.discover_and_connect(self.connect_to_endpoint) Deferred.fromCoroutine(coro) def disable_rate_limiter(self) -> None: @@ -285,6 +299,7 @@ def start(self) -> None: if self.manager is None: raise TypeError('Class was built incorrectly without a HathorManager.') + self._start_peer_connect_loop() self.lc_reconnect.start(5, now=False) self.lc_sync_update.start(self.lc_sync_update_interval, now=False) @@ -311,7 +326,28 @@ def _handle_whitelist_reconnect_err(self, *args: Any, **kwargs: Any) -> None: self.log.error('whitelist reconnect had an exception. Start looping call again.', args=args, kwargs=kwargs) self.reactor.callLater(30, self._start_whitelist_reconnect) + def _start_peer_connect_loop(self) -> None: + # The deferred returned by the LoopingCall start method + # executes when the looping call stops running + # https://docs.twistedmatrix.com/en/stable/api/twisted.internet.task.LoopingCall.html + d = self.lc_connect.start(self.lc_connect_interval, now=True) + d.addErrback(self._handle_peer_connect_err) + + def _handle_peer_connect_err(self, *args: Any, **kwargs: Any) -> None: + # This method will be called when an exception happens inside the peer connect loop + # and ends up stopping the looping call. + # We log the error and start the looping call again. + self.log.error( + 'connect_to_peer_from_connection_queue had an exception. Start looping call again.', + args=args, + kwargs=kwargs, + ) + self.reactor.callLater(self.lc_connect_interval, self._start_peer_connect_loop) + def stop(self) -> None: + if self.lc_connect.running: + self.lc_connect.stop() + if self.lc_reconnect.running: self.lc_reconnect.stop() @@ -398,10 +434,10 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: """Called when a peer is ready.""" assert protocol.peer is not None self.verified_peer_storage.add_or_replace(protocol.peer) - assert protocol.peer.id is not None self.handshaking_peers.remove(protocol) - self.unverified_peer_storage.pop(protocol.peer.id, None) + for conn in self.iter_all_connections(): + conn.unverified_peer_storage.remove(protocol.peer) # we emit the event even if it's a duplicate peer as a matching # NETWORK_PEER_DISCONNECTED will be emitted regardless @@ -411,7 +447,8 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: peers_count=self._get_peers_count() ) - if protocol.peer.id in self.connected_peers: + peer_id = protocol.peer.id + if peer_id in self.connected_peers: # connected twice to same peer self.log.warn('duplicate connection to peer', protocol=protocol) conn = self.get_connection_to_drop(protocol) @@ -420,7 +457,11 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: # the new connection is being dropped, so don't save it to connected_peers return - self.connected_peers[protocol.peer.id] = protocol + self.connected_peers[peer_id] = protocol + if peer_id not in self.new_connection_from_queue: + self.new_connection_from_queue.append(peer_id) + else: + self.log.warn('peer already in queue', peer=str(peer_id)) # In case it was a retry, we must reset the data only here, after it gets ready protocol.peer.info.reset_retry_timestamp() @@ -428,7 +469,7 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: if len(self.connected_peers) <= self.MAX_ENABLED_SYNC: protocol.enable_sync() - if protocol.peer.id in self.always_enable_sync: + if peer_id in self.always_enable_sync: protocol.enable_sync() # Notify other peers about this new peer connection. @@ -448,7 +489,8 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None: if protocol in self.handshaking_peers: self.handshaking_peers.remove(protocol) if protocol._peer is not None: - existing_protocol = self.connected_peers.pop(protocol.peer.id, None) + peer_id = protocol.peer.id + existing_protocol = self.connected_peers.pop(peer_id, None) if existing_protocol is None: # in this case, the connection was closed before it got to READY state return @@ -458,7 +500,10 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None: # A check for duplicate connections is done during PEER_ID state, but there's still a # chance it can happen if both connections start at the same time and none of them has # reached READY state while the other is on PEER_ID state - self.connected_peers[protocol.peer.id] = existing_protocol + self.connected_peers[peer_id] = existing_protocol + elif peer_id in self.new_connection_from_queue: + # now we're sure it can be removed from new_connection_from_queue + self.new_connection_from_queue.remove(peer_id) self.pubsub.publish( HathorEvents.NETWORK_PEER_DISCONNECTED, protocol=protocol, @@ -491,15 +536,6 @@ def is_peer_connected(self, peer_id: PeerId) -> bool: """ return peer_id in self.connected_peers - def on_receive_peer(self, peer: UnverifiedPeer, origin: Optional[ReadyState] = None) -> None: - """ Update a peer information in our storage, and instantly attempt to connect - to it if it is not connected yet. - """ - if peer.id == self.my_peer.id: - return - peer = self.unverified_peer_storage.add_or_merge(peer) - self.connect_to_if_not_connected(peer, int(self.reactor.seconds())) - def peers_cleanup(self) -> None: """Clean up aged peers.""" now = self.reactor.seconds() @@ -515,11 +551,45 @@ def peers_cleanup(self) -> None: for remove_peer in to_be_removed: self.verified_peer_storage.remove(remove_peer) - def reconnect_to_all(self) -> None: - """ It is called by the `lc_reconnect` timer and tries to connect to all known - peers. + def connect_to_peer_from_connection_queue(self) -> None: + """ It is called by the `lc_connect` looping call and tries to connect to a new peer. + """ + if not self.new_connection_from_queue: + self.log.debug('connection queue is empty') + return + assert self.manager is not None + self.log.debug('connect to peer from connection queue') + candidate_new_peers: list[UnverifiedPeer] + # we don't know if we will find a candidate, so we can't do `while True:` + for _ in range(len(self.new_connection_from_queue)): + # for a deque([1, 2, 3, 4]) this will get 1 and modify it to deque([2, 3, 4, 1]) + next_from_peer_id = self.new_connection_from_queue[0] + self.new_connection_from_queue.rotate(-1) + + protocol = self.connected_peers.get(next_from_peer_id) + if protocol is None: + self.log.error('expected protocol not found', peer_id=str(next_from_peer_id)) + assert self.new_connection_from_queue.pop() == next_from_peer_id + continue + candidate_new_peers = [ + candidate_peer + for candidate_peer_id, candidate_peer in protocol.unverified_peer_storage.items() + if candidate_peer_id not in self.connected_peers or candidate_peer_id not in self.connecting_peers + ] + if candidate_new_peers: + break + else: + self.log.debug('no new peers in the connection queue') + # this means we rotated through the whole queue and did not find any candidate + return + + peer = self.rng.choice(candidate_new_peers) + self.log.debug('random peer chosen', peer=str(peer.id), entrypoints=peer.info.entrypoints_as_str()) + now = self.reactor.seconds() + self.connect_to_peer(peer, int(now)) - TODO(epnichols): Should we always connect to *all*? Should there be a max #? + def reconnect_to_all(self) -> None: + """ It is called by the `lc_reconnect` timer and tries to connect to all known peers. """ self.peers_cleanup() # when we have no connected peers left, run the discovery process again @@ -528,10 +598,10 @@ def reconnect_to_all(self) -> None: if now - self._last_discovery >= self.PEER_DISCOVERY_INTERVAL: self._last_discovery = now self.do_discovery() - # We need to use list() here because the dict might change inside connect_to_if_not_connected + # We need to use list() here because the dict might change inside connect_to_peer # when the peer is disconnected and without entrypoint for peer in list(self.verified_peer_storage.values()): - self.connect_to_if_not_connected(peer, int(now)) + self.connect_to_peer(peer, int(now)) def update_whitelist(self) -> Deferred[None]: from twisted.web.client import readBody @@ -574,10 +644,14 @@ def _update_whitelist_cb(self, body: bytes) -> None: for peer_id in peers_to_remove: self.manager.remove_peer_from_whitelist_and_disconnect(peer_id) - def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None: + def connect_to_peer(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None: """ Attempts to connect if it is not connected to the peer. """ - if not peer.info.entrypoints: + if not peer.info.entrypoints or ( + not self.enable_ipv6 and not peer.info.get_ipv4_only_entrypoints() + ) or ( + self.disable_ipv4 and not peer.info.get_ipv6_only_entrypoints() + ): # It makes no sense to keep storing peers that have disconnected and have no entrypoints # We will never be able to connect to them anymore and they will only keep spending memory # and other resources when used in APIs, so we are removing them here @@ -589,8 +663,17 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in assert peer.id is not None if peer.info.can_retry(now): - addr = self.rng.choice(peer.info.entrypoints) - self.connect_to(addr.with_id(peer.id), peer) + if self.enable_ipv6 and not self.disable_ipv4: + addr = self.rng.choice(list(peer.info.entrypoints)) + elif self.enable_ipv6 and self.disable_ipv4: + addr = self.rng.choice(peer.info.get_ipv6_only_entrypoints()) + elif not self.enable_ipv6 and not self.disable_ipv4: + addr = self.rng.choice(peer.info.get_ipv4_only_entrypoints()) + else: + raise ValueError('IPv4 is disabled and IPv6 is not enabled') + self.connect_to_endpoint(addr.with_id(peer.id), peer) + else: + self.log.debug('connecting too often, skip retrying', peer=str(peer.id)) def _connect_to_callback( self, @@ -608,14 +691,17 @@ def _connect_to_callback( protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer) self.connecting_peers.pop(endpoint) - def connect_to( + def connect_to_endpoint( self, entrypoint: PeerEndpoint, peer: UnverifiedPeer | PublicPeer | None = None, use_ssl: bool | None = None, ) -> None: - """ Attempt to connect to a peer, even if a connection already exists. - Usually you should call `connect_to_if_not_connected`. + """ Attempt to connect directly to an endpoint, prefer calling `connect_to_peer` when possible. + + This method does not take into account the peer's id (since we might not even know it, or have verified it even + if we know). But this method will check if there's already a connection open to the given endpoint and skip it + if there is one. If `use_ssl` is True, then the connection will be wraped by a TLS. """ @@ -636,6 +722,14 @@ def connect_to( self.log.debug('skip because of simple localhost check', entrypoint=str(entrypoint)) return + if not self.enable_ipv6 and entrypoint.addr.is_ipv6(): + self.log.info('skip because IPv6 is disabled', entrypoint=entrypoint) + return + + if self.disable_ipv4 and entrypoint.addr.is_ipv4(): + self.log.info('skip because IPv4 is disabled', entrypoint=entrypoint) + return + if use_ssl is None: use_ssl = self.use_ssl @@ -719,7 +813,7 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname: def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None: hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address) - self.my_peer.info.entrypoints.append(hostname_entrypoint) + self.my_peer.info.entrypoints.add(hostname_entrypoint) def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: """ When there are duplicate connections, determine which one should be dropped. diff --git a/hathor/p2p/peer.py b/hathor/p2p/peer.py index 53f43369d..f3e0dfa05 100644 --- a/hathor/p2p/peer.py +++ b/hathor/p2p/peer.py @@ -106,24 +106,33 @@ class PeerInfo: """ Stores entrypoint and connection attempts information. """ - entrypoints: list[PeerAddress] = field(default_factory=list) - retry_timestamp: int = 0 # should only try connecting to this peer after this timestamp - retry_interval: int = 5 # how long to wait for next connection retry. It will double for each failure - retry_attempts: int = 0 # how many retries were made - last_seen: float = inf # last time this peer was seen + entrypoints: set[PeerAddress] = field(default_factory=set) + retry_timestamp: int = 0 # should only try connecting to this peer after this timestamp + retry_interval: int = 5 # how long to wait for next connection retry. It will double for each failure + retry_attempts: int = 0 # how many retries were made + last_seen: float = inf # last time this peer was seen flags: set[str] = field(default_factory=set) _settings: HathorSettings = field(default_factory=get_global_settings, repr=False) + def get_ipv4_only_entrypoints(self) -> list[PeerAddress]: + return list(filter(lambda e: not e.is_ipv6(), self.entrypoints)) + + def get_ipv6_only_entrypoints(self) -> list[PeerAddress]: + return list(filter(lambda e: e.is_ipv6(), self.entrypoints)) + + def ipv4_entrypoints_as_str(self) -> list[str]: + return sorted(map(str, self.get_ipv4_only_entrypoints())) + + def ipv6_entrypoints_as_str(self) -> list[str]: + return sorted(map(str, self.get_ipv6_only_entrypoints())) + def entrypoints_as_str(self) -> list[str]: """Return a list of entrypoints serialized as str""" - return list(map(str, self.entrypoints)) + return sorted(map(str, self.entrypoints)) def _merge(self, other: PeerInfo) -> None: """Actual merge execution, must only be made after verifications.""" - # Merge entrypoints. - for ep in other.entrypoints: - if ep not in self.entrypoints: - self.entrypoints.append(ep) + self.entrypoints.update(other.entrypoints) async def validate_entrypoint(self, protocol: HathorProtocol) -> bool: """ Validates if connection entrypoint is one of the peer entrypoints @@ -203,14 +212,19 @@ class UnverifiedPeer: id: PeerId info: PeerInfo = field(default_factory=PeerInfo) - def to_json(self) -> dict[str, Any]: + def to_json(self, only_ipv4_entrypoints: bool = True) -> dict[str, Any]: """ Return a JSON serialization of the object. This format is compatible with libp2p. """ + if only_ipv4_entrypoints: + entrypoints_as_str = self.info.ipv4_entrypoints_as_str() + else: + entrypoints_as_str = self.info.entrypoints_as_str() + return { 'id': str(self.id), - 'entrypoints': self.info.entrypoints_as_str(), + 'entrypoints': entrypoints_as_str, } @classmethod @@ -220,7 +234,7 @@ def create_from_json(cls, data: dict[str, Any]) -> Self: It is to create an UnverifiedPeer from a peer connection. """ peer_id = PeerId(data['id']) - endpoints = [] + endpoints = set() for endpoint_str in data.get('entrypoints', []): # We have to parse using PeerEndpoint to be able to support older peers that still @@ -228,12 +242,14 @@ def create_from_json(cls, data: dict[str, Any]) -> Self: endpoint = PeerEndpoint.parse(endpoint_str) if endpoint.peer_id is not None and endpoint.peer_id != peer_id: raise ValueError(f'conflicting peer_id: {endpoint.peer_id} != {peer_id}') - endpoints.append(endpoint.addr) + endpoints.add(endpoint.addr) - return cls( + obj = cls( id=peer_id, info=PeerInfo(entrypoints=endpoints), ) + obj.validate() + return obj def merge(self, other: UnverifiedPeer) -> None: """ Merge two UnverifiedPeer objects, checking that they have the same @@ -242,6 +258,12 @@ def merge(self, other: UnverifiedPeer) -> None: """ assert self.id == other.id self.info._merge(other.info) + self.validate() + + def validate(self) -> None: + """Check if there are too many entrypoints.""" + if len(self.info.entrypoints) > self.info._settings.PEER_MAX_ENTRYPOINTS: + raise InvalidPeerIdException('too many entrypoints') @dataclass(slots=True) diff --git a/hathor/p2p/peer_discovery/bootstrap.py b/hathor/p2p/peer_discovery/bootstrap.py index 55b5e9f16..23399e2ed 100644 --- a/hathor/p2p/peer_discovery/bootstrap.py +++ b/hathor/p2p/peer_discovery/bootstrap.py @@ -37,6 +37,6 @@ def __init__(self, entrypoints: list[PeerEndpoint]): self.entrypoints = entrypoints @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: for entrypoint in self.entrypoints: - connect_to(entrypoint) + connect_to_endpoint(entrypoint) diff --git a/hathor/p2p/peer_discovery/dns.py b/hathor/p2p/peer_discovery/dns.py index c5dfe74d6..9ef792a96 100644 --- a/hathor/p2p/peer_discovery/dns.py +++ b/hathor/p2p/peer_discovery/dns.py @@ -53,13 +53,13 @@ def do_lookup_text(self, host: str) -> Deferred[LookupResult]: return lookupText(host) @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: """ Run DNS lookup for host and connect to it This is executed when starting the DNS Peer Discovery and first connecting to the network """ for host in self.hosts: for entrypoint in (await self.dns_seed_lookup(host)): - connect_to(entrypoint) + connect_to_endpoint(entrypoint) async def dns_seed_lookup(self, host: str) -> set[PeerEndpoint]: """ Run a DNS lookup for TXT, A, and AAAA records and return a list of connection strings. diff --git a/hathor/p2p/peer_discovery/peer_discovery.py b/hathor/p2p/peer_discovery/peer_discovery.py index 7d040fae2..ae8ee626b 100644 --- a/hathor/p2p/peer_discovery/peer_discovery.py +++ b/hathor/p2p/peer_discovery/peer_discovery.py @@ -23,10 +23,10 @@ class PeerDiscovery(ABC): """ @abstractmethod - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: - """ This method must discover the peers and call `connect_to` for each of them. + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: + """ This method must discover the peers and call `connect_to_endpoint` for each of them. - :param connect_to: Function which will be called for each discovered peer. - :type connect_to: function + :param connect_to_endpoint: Function which will be called for each discovered peer. + :type connect_to_endpoint: function """ raise NotImplementedError diff --git a/hathor/p2p/peer_endpoint.py b/hathor/p2p/peer_endpoint.py index c7cafce20..b98ec28fc 100644 --- a/hathor/p2p/peer_endpoint.py +++ b/hathor/p2p/peer_endpoint.py @@ -14,13 +14,14 @@ from __future__ import annotations +import re from dataclasses import dataclass from enum import Enum from typing import Any from urllib.parse import parse_qs, urlparse from twisted.internet.address import IPv4Address, IPv6Address -from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.endpoints import TCP4ClientEndpoint, TCP6ClientEndpoint from twisted.internet.interfaces import IAddress, IStreamClientEndpoint from typing_extensions import Self @@ -32,6 +33,40 @@ 'instead, compare the addr attribute explicitly, and if relevant, the peer_id too.' ) +""" +This Regex will match any valid IPv6 address. + +Some examples that will match: + '::' + '::1' + '2001:0db8:85a3:0000:0000:8a2e:0370:7334' + '2001:db8:85a3:0:0:8a2e:370:7334' + '2001:db8::8a2e:370:7334' + '2001:db8:0:0:0:0:2:1' + '1234::5678' + 'fe80::' + '::abcd:abcd:abcd:abcd:abcd:abcd' + '0:0:0:0:0:0:0:1' + '0:0:0:0:0:0:0:0' + +Some examples that won't match: + '127.0.0.1' --> # IPv4 + '1200::AB00:1234::2552:7777:1313' --> # double '::' + '2001:db8::g123' --> # invalid character + '2001:db8::85a3::7334' --> # double '::' + '2001:db8:85a3:0000:0000:8a2e:0370:7334:1234' --> # too many groups + '12345::abcd' --> # too many characters in a group + '2001:db8:85a3:8a2e:0370' --> # too few groups + '2001:db8:85a3::8a2e:3707334' --> # too many characters in a group + '1234:56789::abcd' --> # too many characters in a group + ':2001:db8::1' --> # invalid start + '2001:db8::1:' --> # invalid end +""" +IPV6_REGEX = re.compile(r'''^(([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]{1,4}|:)|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:))$''') # noqa: E501 + +# A host with length 64 and over would be rejected later by twisted +MAX_HOST_LEN = 63 + class Protocol(Enum): TCP = 'tcp' @@ -46,7 +81,11 @@ class PeerAddress: port: int def __str__(self) -> str: - return f'{self.protocol.value}://{self.host}:{self.port}' + host = self.host + if self.is_ipv6(): + host = f'[{self.host}]' + + return f'{self.protocol.value}://{host}:{self.port}' def __eq__(self, other: Any) -> bool: """ @@ -138,9 +177,11 @@ def from_address(cls, address: IAddress) -> Self: def to_client_endpoint(self, reactor: Reactor) -> IStreamClientEndpoint: """This method generates a twisted client endpoint that has a .connect() method.""" - # XXX: currently we don't support IPv6, but when we do we have to decide between TCP4ClientEndpoint and - # TCP6ClientEndpoint, when the host is an IP address that is easy, but when it is a DNS hostname, we will not - # know which to use until we know which resource records it holds (A or AAAA) + # XXX: currently we only support IPv6 IPs, not hosts resolving to AAAA records. + # To support them we would have to perform DNS queries to resolve + # the host and check which record it holds (A or AAAA). + if self.is_ipv6(): + return TCP6ClientEndpoint(reactor, self.host, self.port) return TCP4ClientEndpoint(reactor, self.host, self.port) def is_localhost(self) -> bool: @@ -157,7 +198,18 @@ def is_localhost(self) -> bool: >>> PeerAddress.parse('tcp://foo.bar:444').is_localhost() False """ - return self.host in ('127.0.0.1', 'localhost') + return self.host in ('127.0.0.1', 'localhost', '::1') + + def is_ipv6(self) -> bool: + """Used to determine if the entrypoint host is an IPv6 address. + """ + # XXX: This means we don't currently consider DNS names that resolve to IPv6 addresses as IPv6. + return IPV6_REGEX.fullmatch(self.host) is not None + + def is_ipv4(self) -> bool: + """Used to determine if the entrypoint host is an IPv4 address. + """ + return not self.is_ipv6() def with_id(self, peer_id: PeerId | None = None) -> PeerEndpoint: """Create a PeerEndpoint instance with self as the address and with the provided peer_id, or None.""" @@ -212,6 +264,14 @@ def parse(cls, description: str) -> PeerEndpoint: >>> str(PeerEndpoint.parse('tcp://foo.bar.baz:40403/')) 'tcp://foo.bar.baz:40403' + >>> str(PeerEndpoint.parse('tcp://foooooooooooooooooooo.baaaaaaaaaaaaaaaaaar.baaaaaaaaaaaaaaaaaaz:40403/')) + 'tcp://foooooooooooooooooooo.baaaaaaaaaaaaaaaaaar.baaaaaaaaaaaaaaaaaaz:40403' + + >>> PeerEndpoint.parse('tcp://foooooooooooooooooooo.baaaaaaaaaaaaaaaaaar.baaaaaaaaaaaaaaaaaazz:40403/') + Traceback (most recent call last): + ... + ValueError: hostname too long + >>> PeerEndpoint.parse('tcp://127.0.0.1:40403/?id=123') Traceback (most recent call last): ... @@ -268,6 +328,8 @@ def _parse_address_parts(description: str) -> tuple[Protocol, str, int, str]: host = url.hostname if host is None: raise ValueError(f'expected a host: "{description}"') + if len(host) > MAX_HOST_LEN: + raise ValueError('hostname too long') port = url.port if port is None: raise ValueError(f'expected a port: "{description}"') diff --git a/hathor/p2p/peer_storage.py b/hathor/p2p/peer_storage.py index b6a433077..6f3744439 100644 --- a/hathor/p2p/peer_storage.py +++ b/hathor/p2p/peer_storage.py @@ -18,6 +18,7 @@ from hathor.p2p.peer import PublicPeer, UnverifiedPeer from hathor.p2p.peer_id import PeerId +from hathor.util import Random class GenericPeer(Protocol): @@ -36,6 +37,18 @@ class _BasePeerStorage(dict[PeerId, PeerType]): """ Base class for VerifiedPeerStorage and UnverifiedPeerStorage, do not use directly. """ + def __init__(self, *, rng: Random, max_size: int) -> None: + self.rng = rng + self.max_size = max_size + + def _ensure_max_size(self) -> None: + to_remove_count = len(self) - self.max_size + if to_remove_count < 1: + return + to_remove = self.rng.choices(list(self.keys()), k=to_remove_count) + for k in to_remove: + self.pop(k) + def add(self, peer: PeerType) -> None: """ Add a new peer to the storage. @@ -45,6 +58,7 @@ def add(self, peer: PeerType) -> None: if peer.id in self: raise ValueError('Peer has already been added') self[peer.id] = peer + self._ensure_max_size() def add_or_merge(self, peer: PeerType) -> PeerType: """ Add a peer to the storage if it has not been added yet. Otherwise, merge it with the existing peer. @@ -76,14 +90,16 @@ def remove(self, peer: GenericPeer) -> None: class VerifiedPeerStorage(_BasePeerStorage[PublicPeer]): - """ VerifiedPeerStorage is used to store all peers that we have connected to and verified. + """ Used to store all peers that we have connected to and verified. - It is a dict of PublicPeer objects, and peers can be retrieved by their `peer.id`. + It is a dict of `PublicPeer` objects that should live in the `ConnectionsManager`, the keys are the `peer.id` of + the remote peers. """ class UnverifiedPeerStorage(_BasePeerStorage[UnverifiedPeer]): - """ UnverifiedPeerStorage is used to store all received peers, we haven't verified their ids/entrypoints yet. + """ Used to store all peers that we have connected to and verified. - It is a dict of Peer objects, and peers can be retrieved by their `peer.id`. + It is a dict of `UnverifiedPeer` objects that should live in a `HathorProtocol`, the keys are the `peer.id` of + the remote peers. """ diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index cd90601e8..b582fcb77 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -29,6 +29,7 @@ from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer from hathor.p2p.peer_endpoint import PeerEndpoint from hathor.p2p.peer_id import PeerId +from hathor.p2p.peer_storage import UnverifiedPeerStorage from hathor.p2p.rate_limiter import RateLimiter from hathor.p2p.states import BaseState, HelloState, PeerIdState, ReadyState from hathor.p2p.sync_version import SyncVersion @@ -164,6 +165,13 @@ def __init__( self.use_ssl: bool = use_ssl + # List of peers received from the network. + # We cannot trust their identity before we connect to them. + self.unverified_peer_storage = UnverifiedPeerStorage( + rng=self.connections.rng, + max_size=self._settings.MAX_UNVERIFIED_PEERS_PER_CONN, + ) + # Protocol version is initially unset self.sync_version = None @@ -368,6 +376,20 @@ def disconnect(self, reason: str = '', *, force: bool = False) -> None: else: transport.abortConnection() + def on_receive_peer(self, peer: UnverifiedPeer) -> None: + """ Update a peer information in our storage, the manager's connection loop will pick it later. + """ + # ignore when the remote echo backs our own peer + if peer.id == self.my_peer.id: + return + # ignore peers we've already connected to + if peer.id in self.connections.verified_peer_storage: + return + # merge with known previous information received from this peer since we don't know what's right (a peer can + # change their entrypoints, but the old could still echo, since we haven't connected yet don't assume anything + # and just merge them) + self.unverified_peer_storage.add_or_merge(peer) + def handle_error(self, payload: str) -> None: """ Executed when an ERROR command is received. """ diff --git a/hathor/p2p/resources/add_peers.py b/hathor/p2p/resources/add_peers.py index c8faeb5dc..fcfe9732d 100644 --- a/hathor/p2p/resources/add_peers.py +++ b/hathor/p2p/resources/add_peers.py @@ -86,7 +86,7 @@ def already_connected(endpoint: PeerEndpoint) -> bool: pd = BootstrapPeerDiscovery(filtered_peers) # this fires and forget the coroutine, which is compatible with the original behavior - coro = pd.discover_and_connect(self.manager.connections.connect_to) + coro = pd.discover_and_connect(self.manager.connections.connect_to_endpoint) Deferred.fromCoroutine(coro) ret = {'success': True, 'peers': [str(p) for p in filtered_peers]} diff --git a/hathor/p2p/states/peer_id.py b/hathor/p2p/states/peer_id.py index 77e8a051e..e46e62ce9 100644 --- a/hathor/p2p/states/peer_id.py +++ b/hathor/p2p/states/peer_id.py @@ -42,6 +42,12 @@ def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None self.my_peer_ready = False self.other_peer_ready = False + # Common capabilities between the two peers + common_capabilities = protocol.capabilities & set(protocol.node.capabilities) + + # whether to relay IPV6 entrypoints + self.should_relay_ipv6_entrypoints: bool = self._settings.CAPABILITY_IPV6 in common_capabilities + def on_enter(self) -> None: self.send_peer_id() @@ -65,10 +71,16 @@ def handle_ready(self, payload: str) -> None: def _get_peer_id_data(self) -> dict[str, Any]: my_peer = self.protocol.my_peer + + if not self.should_relay_ipv6_entrypoints: + entrypoints_as_str = my_peer.info.ipv4_entrypoints_as_str() + else: + entrypoints_as_str = my_peer.info.entrypoints_as_str() + return dict( id=str(my_peer.id), pubKey=my_peer.get_public_key(), - entrypoints=my_peer.info.entrypoints_as_str(), + entrypoints=entrypoints_as_str, ) def send_peer_id(self) -> None: diff --git a/hathor/p2p/states/ready.py b/hathor/p2p/states/ready.py index 1bed1c745..7d10dcc98 100644 --- a/hathor/p2p/states/ready.py +++ b/hathor/p2p/states/ready.py @@ -96,6 +96,9 @@ def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None ProtocolMessages.BEST_BLOCKCHAIN: self.handle_best_blockchain, }) + # whether to relay IPV6 entrypoints + self.should_relay_ipv6_entrypoints: bool = self._settings.CAPABILITY_IPV6 in common_capabilities + # Initialize sync manager and add its commands to the list of available commands. connections = self.protocol.connections assert connections is not None @@ -163,8 +166,16 @@ def send_peers(self, peer_list: Iterable[PublicPeer]) -> None: """ data = [] for peer in peer_list: - if peer.info.entrypoints: - data.append(peer.to_unverified_peer().to_json()) + if self.should_relay_ipv6_entrypoints and not peer.info.entrypoints: + self.log.debug('no entrypoints to relay', peer=str(peer.id)) + continue + + if not self.should_relay_ipv6_entrypoints and not peer.info.get_ipv4_only_entrypoints(): + self.log.debug('no ipv4 entrypoints to relay', peer=str(peer.id)) + continue + + data.append(peer.to_unverified_peer().to_json( + only_ipv4_entrypoints=not self.should_relay_ipv6_entrypoints)) self.send_message(ProtocolMessages.PEERS, json_dumps(data)) self.log.debug('send peers', peers=data) @@ -176,7 +187,7 @@ def handle_peers(self, payload: str) -> None: for data in received_peers: peer = UnverifiedPeer.create_from_json(data) if self.protocol.connections: - self.protocol.connections.on_receive_peer(peer, origin=self) + self.protocol.on_receive_peer(peer) self.log.debug('received peers', payload=payload) def send_ping_if_necessary(self) -> None: diff --git a/hathor/simulator/fake_connection.py b/hathor/simulator/fake_connection.py index b3a29afc9..4f569e818 100644 --- a/hathor/simulator/fake_connection.py +++ b/hathor/simulator/fake_connection.py @@ -19,7 +19,7 @@ from OpenSSL.crypto import X509 from structlog import get_logger -from twisted.internet.address import IPv4Address +from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.testing import StringTransport from hathor.p2p.peer import PrivatePeer @@ -34,7 +34,7 @@ class HathorStringTransport(StringTransport): - def __init__(self, peer: PrivatePeer, *, peer_address: IPv4Address): + def __init__(self, peer: PrivatePeer, *, peer_address: IPv4Address | IPv6Address): super().__init__(peerAddress=peer_address) self._peer = peer @@ -58,8 +58,8 @@ def __init__( *, latency: float = 0, autoreconnect: bool = False, - addr1: IPv4Address | None = None, - addr2: IPv4Address | None = None, + addr1: IPv4Address | IPv6Address | None = None, + addr2: IPv4Address | IPv6Address | None = None, fake_bootstrap_id: PeerId | None | Literal[False] = False, ): """ @@ -275,9 +275,10 @@ def reconnect(self) -> None: self._proto1 = self.manager1.connections.server_factory.buildProtocol(self.addr2) self._proto2 = self.manager2.connections.client_factory.buildProtocol(self.addr1) - # When _fake_bootstrap_id is set we don't pass the peer because that's how bootstrap calls connect_to() + # When _fake_bootstrap_id is set we don't pass the peer because that's how bootstrap calls + # connect_to_endpoint() peer = self._proto1.my_peer.to_unverified_peer() if self._fake_bootstrap_id is False else None - self.manager2.connections.connect_to(self.entrypoint, peer) + self.manager2.connections.connect_to_endpoint(self.entrypoint, peer) connecting_peers = list(self.manager2.connections.connecting_peers.values()) for connecting_peer in connecting_peers: diff --git a/tests/cli/test_run_node.py b/tests/cli/test_run_node.py index 3b72a2592..84d73d2ef 100644 --- a/tests/cli/test_run_node.py +++ b/tests/cli/test_run_node.py @@ -20,6 +20,7 @@ def register_signal_handlers(self) -> None: @patch('twisted.internet.reactor.listenTCP') def test_listen_tcp_ipv4(self, mock_listenTCP): + """Should call listenTCP with no interface defined when using only IPv4""" class CustomRunNode(RunNode): def start_manager(self) -> None: pass @@ -31,3 +32,31 @@ def register_signal_handlers(self) -> None: self.assertTrue(run_node is not None) mock_listenTCP.assert_called_with(1234, ANY) + + @patch('twisted.internet.reactor.listenTCP') + def test_listen_tcp_ipv6(self, mock_listenTCP): + """Should call listenTCP with interface='::0' when enabling IPv6""" + class CustomRunNode(RunNode): + def start_manager(self) -> None: + pass + + def register_signal_handlers(self) -> None: + pass + + run_node = CustomRunNode(argv=['--memory-storage', '--x-enable-ipv6', '--status', '1234']) + self.assertTrue(run_node is not None) + + mock_listenTCP.assert_called_with(1234, ANY, interface='::0') + + def test_validate_ipv4_or_ipv6(self): + """The program should exit if no IP version is enabled""" + class CustomRunNode(RunNode): + def start_manager(self) -> None: + pass + + def register_signal_handlers(self) -> None: + pass + + # Should call system exit + with self.assertRaises(SystemExit): + CustomRunNode(argv=['--memory-storage', '--x-disable-ipv4', '--status', '1234']) diff --git a/tests/others/test_cli_builder.py b/tests/others/test_cli_builder.py index a83f00899..d1a95c6e6 100644 --- a/tests/others/test_cli_builder.py +++ b/tests/others/test_cli_builder.py @@ -192,4 +192,4 @@ def test_event_queue_with_memory_storage(self): def test_event_queue_with_full_verification(self): args = ['--x-enable-event-queue', '--memory-storage', '--x-full-verification'] - self._build_with_error(args, '--x-full-verification cannot be used with --x-enable-event-queue') + self._build_with_error(args, '--x-full-verification cannot be used with --enable-event-queue') diff --git a/tests/others/test_metrics.py b/tests/others/test_metrics.py index b46f6985b..0fb201377 100644 --- a/tests/others/test_metrics.py +++ b/tests/others/test_metrics.py @@ -72,7 +72,7 @@ def test_connections_manager_integration(self): # Execution endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') # This will trigger sending to the pubsub one of the network events - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) self.run_to_completion() diff --git a/tests/p2p/test_bootstrap.py b/tests/p2p/test_bootstrap.py index 82aa932bb..7ae668057 100644 --- a/tests/p2p/test_bootstrap.py +++ b/tests/p2p/test_bootstrap.py @@ -19,10 +19,10 @@ def __init__(self, mocked_host_ports: list[tuple[str, int]]): self.mocked_host_ports = mocked_host_ports @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: for host, port in self.mocked_host_ports: addr = PeerAddress(Protocol.TCP, host, port) - connect_to(addr.with_id()) + connect_to_endpoint(addr.with_id()) class MockDNSPeerDiscovery(DNSPeerDiscovery): @@ -50,7 +50,18 @@ class BootstrapTestCase(unittest.TestCase): def test_mock_discovery(self) -> None: pubsub = PubSubManager(self.clock) peer = PrivatePeer.auto_generated() - connections = ConnectionsManager(self._settings, self.clock, peer, pubsub, True, self.rng, True) + connections = ConnectionsManager( + self._settings, + self.clock, + peer, + pubsub, + True, + self.rng, + True, + enable_ipv6=False, + disable_ipv4=False + ) + host_ports1 = [ ('foobar', 1234), ('127.0.0.99', 9999), @@ -74,7 +85,18 @@ def test_mock_discovery(self) -> None: def test_dns_discovery(self) -> None: pubsub = PubSubManager(self.clock) peer = PrivatePeer.auto_generated() - connections = ConnectionsManager(self._settings, self.clock, peer, pubsub, True, self.rng, True) + connections = ConnectionsManager( + self._settings, + self.clock, + peer, + pubsub, + True, + self.rng, + True, + enable_ipv6=False, + disable_ipv4=False + ) + bootstrap_a = [ '127.0.0.99', '127.0.0.88', diff --git a/tests/p2p/test_connections.py b/tests/p2p/test_connections.py index b27897ca4..a0c910dda 100644 --- a/tests/p2p/test_connections.py +++ b/tests/p2p/test_connections.py @@ -18,8 +18,74 @@ def test_manager_connections(self) -> None: manager: HathorManager = self.create_peer('testnet', enable_sync_v1=True, enable_sync_v2=False) endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) self.assertIn(endpoint, manager.connections.iter_not_ready_endpoints()) self.assertNotIn(endpoint, manager.connections.iter_ready_connections()) self.assertNotIn(endpoint, manager.connections.iter_all_connections()) + + def test_manager_disabled_ipv6(self) -> None: + """Should not try to connect to ipv6 peers if ipv6 is disabled""" + + manager = self.create_peer( + 'testnet', + enable_sync_v1=False, + enable_sync_v2=True, + enable_ipv6=False, + disable_ipv4=False + ) + + endpoint = PeerEndpoint.parse('tcp://[::1]:8005') + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) + + self.assertNotIn(endpoint, manager.connections.iter_not_ready_endpoints()) + self.assertNotIn(endpoint, manager.connections.iter_ready_connections()) + self.assertNotIn(endpoint, manager.connections.iter_all_connections()) + + def test_manager_enabled_ipv6_and_ipv4(self) -> None: + """Should connect to both ipv4 and ipv6 peers if both are enabled""" + + manager = self.create_peer( + 'testnet', + enable_sync_v1=False, + enable_sync_v2=True, + enable_ipv6=True, + disable_ipv4=False + ) + + endpoint_ipv6 = PeerEndpoint.parse('tcp://[::3:2:1]:8005') + manager.connections.connect_to_endpoint(endpoint_ipv6, use_ssl=True) + + endpoint_ipv4 = PeerEndpoint.parse('tcp://1.2.3.4:8005') + manager.connections.connect_to_endpoint(endpoint_ipv4, use_ssl=True) + + self.assertIn( + endpoint_ipv4.addr.host, + list(map(lambda x: x.addr.host, manager.connections.iter_not_ready_endpoints())) + ) + self.assertIn( + endpoint_ipv6.addr.host, + list(map(lambda x: x.addr.host, manager.connections.iter_not_ready_endpoints())) + ) + + self.assertEqual(2, len(list(manager.connections.iter_not_ready_endpoints()))) + self.assertEqual(0, len(list(manager.connections.iter_ready_connections()))) + self.assertEqual(0, len(list(manager.connections.iter_all_connections()))) + + def test_manager_disabled_ipv4(self) -> None: + """Should not try to connect to ipv4 peers if ipv4 is disabled""" + + manager = self.create_peer( + 'testnet', + enable_sync_v1=False, + enable_sync_v2=True, + enable_ipv6=True, + disable_ipv4=True, + ) + + endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) + + self.assertEqual(0, len(list(manager.connections.iter_not_ready_endpoints()))) + self.assertEqual(0, len(list(manager.connections.iter_ready_connections()))) + self.assertEqual(0, len(list(manager.connections.iter_all_connections()))) diff --git a/tests/p2p/test_connectivity.py b/tests/p2p/test_connectivity.py new file mode 100644 index 000000000..328a17050 --- /dev/null +++ b/tests/p2p/test_connectivity.py @@ -0,0 +1,56 @@ +import time +import urllib +from contextlib import contextmanager +from typing import Generator + +import requests + +from tests.utils import run_server + + +@contextmanager +def _run_servers_context(count: int) -> Generator[list[tuple[str, str]], None, None]: + """ Runs `count` number of `test.utils.run_server` that bootstrap in chain, yields a (endpoint, status_url) list. + """ + if count > 80: + raise ValueError('cannot start more than 80 processes at once') + start_port = 8005 + endpoint_and_status_urls = [] + processes = [] + try: + previous_endpoint: None | str = None + for listen_port in range(start_port, start_port + count): + status_port = listen_port + 80 + endpoint = f'tcp://127.0.0.1:{listen_port}' + status_url = f'http://127.0.0.1:{status_port}' + # XXX: it's important for run_server to be inside the try because if it fails it will still terminate the + # ones that were previously started because they would have made it into the processes list + processes.append(run_server(listen=listen_port, status=status_port, bootstrap=previous_endpoint)) + endpoint_and_status_urls.append((endpoint, status_url)) + previous_endpoint = endpoint + yield endpoint_and_status_urls + finally: + for process in processes: + # XXX: this assumes process.terminate() will not fail + process.terminate() + + +def test_manager_connection_transitivity() -> None: + """ Creates a chain of 4 peers that bootstrap to the previous one, they should all connect to each other. + """ + with _run_servers_context(4) as endpoint_status_pairs: + assert len(endpoint_status_pairs) == 4 + time.sleep(1) # 1 sec should be more than enough for the peers to connect to themselves + + statuses = [ + requests.get(urllib.parse.urljoin(status_url, '/v1a/status/')).json() + for _, status_url in endpoint_status_pairs + ] + + all_peer_ids = set(status['server']['id'] for status in statuses) + + for status in statuses: + peer_id = status['server']['id'] + all_other_peer_ids = all_peer_ids - {peer_id} + connected_peer_ids = {i['id'] for i in status['connections']['connected_peers']} + assert all_other_peer_ids == connected_peer_ids diff --git a/tests/p2p/test_entrypoint.py b/tests/p2p/test_entrypoint.py new file mode 100644 index 000000000..718ca4bdc --- /dev/null +++ b/tests/p2p/test_entrypoint.py @@ -0,0 +1,42 @@ +from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint, Protocol +from tests import unittest + + +class EntrypointTestCase(unittest.TestCase): + def test_is_ipv6(self) -> None: + valid_addresses = [ + '::', + '::1', + '2001:0db8:85a3:0000:0000:8a2e:0370:7334', + '2001:db8:85a3:0:0:8a2e:370:7334', + '2001:db8::8a2e:370:7334', + '2001:db8:0:0:0:0:2:1', + '1234::5678', + 'fe80::', + '::abcd:abcd:abcd:abcd:abcd:abcd', + '0:0:0:0:0:0:0:1', + '0:0:0:0:0:0:0:0' + ] + + invalid_addresses = [ + '127.0.0.1', + '1200::AB00:1234::2552:7777:1313', + '2001:db8::g123', + '2001:db8::85a3::7334', + '2001:db8:85a3:0000:0000:8a2e:0370:7334:1234', + '12345::abcd', + '2001:db8:85a3:8a2e:0370', + '2001:db8:85a3::8a2e:3707334', + '1234:56789::abcd', + ':2001:db8::1', + '2001:db8::1:', + '2001::85a3::8a2e:370:7334' + ] + + for address in valid_addresses: + peer_address = PeerAddress(Protocol.TCP, address, 40403) + self.assertTrue(PeerEndpoint(peer_address).addr.is_ipv6()) + + for address in invalid_addresses: + peer_address = PeerAddress(Protocol.TCP, address, 40403) + self.assertFalse(PeerEndpoint(peer_address).addr.is_ipv6()) diff --git a/tests/p2p/test_peer_id.py b/tests/p2p/test_peer_id.py index 56dfaf79b..75030843b 100644 --- a/tests/p2p/test_peer_id.py +++ b/tests/p2p/test_peer_id.py @@ -72,7 +72,7 @@ def test_sign_verify_fail(self) -> None: def test_merge_peer(self) -> None: # Testing peer storage with merge of peers - peer_storage = VerifiedPeerStorage() + peer_storage = VerifiedPeerStorage(rng=self.rng, max_size=100) p1 = PrivatePeer.auto_generated() p2 = PrivatePeer.auto_generated() @@ -86,19 +86,19 @@ def test_merge_peer(self) -> None: peer = peer_storage[p1.id] self.assertEqual(peer.id, p1.id) self.assertEqual(peer.public_key, p1.public_key) - self.assertEqual(peer.info.entrypoints, []) + self.assertEqual(peer.info.entrypoints, set()) ep1 = PeerAddress.parse('tcp://127.0.0.1:1001') ep2 = PeerAddress.parse('tcp://127.0.0.1:1002') ep3 = PeerAddress.parse('tcp://127.0.0.1:1003') p3 = PrivatePeer.auto_generated().to_public_peer() - p3.info.entrypoints.append(ep1) - p3.info.entrypoints.append(ep2) + p3.info.entrypoints.add(ep1) + p3.info.entrypoints.add(ep2) p4 = PublicPeer(UnverifiedPeer(id=p3.id), public_key=p3.public_key) - p4.info.entrypoints.append(ep2) - p4.info.entrypoints.append(ep3) + p4.info.entrypoints.add(ep2) + p4.info.entrypoints.add(ep3) peer_storage.add_or_merge(p4) self.assertEqual(len(peer_storage), 2) @@ -213,16 +213,16 @@ def test_unverified_peer_to_json_roundtrip(self) -> None: peer_json_simple = dict( id=str(peer_id), - entrypoints=[addr1, addr2, addr3] + entrypoints=sorted({addr1, addr2, addr3}) ) result = UnverifiedPeer.create_from_json(peer_json_simple) assert result.id == peer_id - assert result.info.entrypoints == [ + assert result.info.entrypoints == { PeerAddress.parse(addr1), PeerAddress.parse(addr2), PeerAddress.parse(addr3), - ] + } assert result.to_json() == peer_json_simple # We support this for compatibility with old peers that may send ids in the URLs @@ -237,11 +237,11 @@ def test_unverified_peer_to_json_roundtrip(self) -> None: result = UnverifiedPeer.create_from_json(peer_json_with_ids) assert result.id == peer_id - assert result.info.entrypoints == [ + assert result.info.entrypoints == { PeerAddress.parse(addr1), PeerAddress.parse(addr2), PeerAddress.parse(addr3), - ] + } assert result.to_json() == peer_json_simple # the roundtrip erases the ids from the URLs other_peer_id = PrivatePeer.auto_generated().id @@ -276,6 +276,10 @@ async def test_validate_entrypoint(self) -> None: peer.info.entrypoints = [PeerAddress.parse('tcp://uri_name:40403')] result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) + # if entrypoint is an IPv6 + peer.entrypoints = [PeerEndpoint.parse('tcp://[::1]:40403')] + result = await peer.info.validate_entrypoint(protocol) + self.assertTrue(result) # test invalid. DNS in test mode will resolve to '127.0.0.1:40403' protocol.entrypoint = PeerEndpoint.parse('tcp://45.45.45.45:40403') result = await peer.info.validate_entrypoint(protocol) @@ -298,6 +302,10 @@ def getPeer(self) -> DummyPeer: peer.info.entrypoints = [PeerAddress.parse('tcp://uri_name:40403')] result = await peer.info.validate_entrypoint(protocol) self.assertTrue(result) + # if entrypoint is an IPv6 + peer.entrypoints = [PeerEndpoint.parse('tcp://[2001:db8::ff00:42:8329]:40403')] + result = await peer.info.validate_entrypoint(protocol) + self.assertTrue(result) class SyncV1PeerIdTest(unittest.SyncV1Params, BasePeerIdTest): diff --git a/tests/p2p/test_peer_storage.py b/tests/p2p/test_peer_storage.py new file mode 100644 index 000000000..8fbbad4f6 --- /dev/null +++ b/tests/p2p/test_peer_storage.py @@ -0,0 +1,30 @@ +import pytest + +from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage +from hathor.util import Random +from tests.unittest import PEER_ID_POOL + + +@pytest.fixture +def rng() -> Random: + import secrets + seed = secrets.randbits(64) + return Random(seed) + + +def test_unverified_peer_storage_max_size(rng: Random) -> None: + max_size = 5 + peer_storage = UnverifiedPeerStorage(rng=rng, max_size=max_size) + for i in range(2 * max_size): + peer = PEER_ID_POOL[i].to_unverified_peer() + peer_storage.add(peer) + assert len(peer_storage) == max_size + + +def test_verified_peer_storage_max_size(rng: Random) -> None: + max_size = 5 + peer_storage = VerifiedPeerStorage(rng=rng, max_size=max_size) + for i in range(2 * max_size): + peer = PEER_ID_POOL[i].to_public_peer() + peer_storage.add(peer) + assert len(peer_storage) == max_size diff --git a/tests/p2p/test_protocol.py b/tests/p2p/test_protocol.py index 841a45929..7a054b578 100644 --- a/tests/p2p/test_protocol.py +++ b/tests/p2p/test_protocol.py @@ -3,6 +3,7 @@ from unittest.mock import Mock, patch from twisted.internet import defer +from twisted.internet.address import IPv4Address from twisted.internet.protocol import Protocol from twisted.python.failure import Failure @@ -10,7 +11,7 @@ from hathor.p2p.manager import ConnectionsManager from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer import PrivatePeer -from hathor.p2p.peer_endpoint import PeerAddress +from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint from hathor.p2p.protocol import HathorLineReceiver, HathorProtocol from hathor.simulator import FakeConnection from hathor.util import json_dumps, json_loadb @@ -77,8 +78,8 @@ def test_on_connect(self) -> None: def test_peer_with_entrypoint(self) -> None: entrypoint_str = 'tcp://192.168.1.1:54321' entrypoint = PeerAddress.parse(entrypoint_str) - self.peer1.info.entrypoints.append(entrypoint) - self.peer2.info.entrypoints.append(entrypoint) + self.peer1.info.entrypoints.add(entrypoint) + self.peer2.info.entrypoints.add(entrypoint) self.conn.run_one_step() # HELLO msg1 = self.conn.peek_tr1_value() @@ -201,6 +202,87 @@ def test_valid_hello(self) -> None: self.assertFalse(self.conn.tr1.disconnecting) self.assertFalse(self.conn.tr2.disconnecting) + def test_hello_without_ipv6_capability(self) -> None: + """Tests the connection between peers with and without the IPV6 capability. + Expected behavior: the entrypoint with IPV6 is not relayed. + """ + network = 'testnet' + manager1 = self.create_peer( + network, + peer=self.peer1, + capabilities=[self._settings.CAPABILITY_IPV6, self._settings.CAPABILITY_SYNC_VERSION] + ) + manager2 = self.create_peer( + network, + peer=self.peer2, + capabilities=[self._settings.CAPABILITY_SYNC_VERSION] + ) + + port1 = FakeConnection._get_port(manager1) + port2 = FakeConnection._get_port(manager2) + + addr1 = IPv4Address('TCP', '192.168.1.1', port1) + addr2 = IPv4Address('TCP', '192.168.1.1', port2) + + entrypoint_1_ipv6 = PeerEndpoint.parse('tcp://[::1]:54321') + entrypoint_1_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port1}') + entrypoint_2_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port2}') + + self.peer1.info.entrypoints.add(entrypoint_1_ipv6.addr) + self.peer1.info.entrypoints.add(entrypoint_1_ipv4.addr) + self.peer2.info.entrypoints.add(entrypoint_2_ipv4.addr) + + conn = FakeConnection(manager1, manager2, addr1=addr1, addr2=addr2) + + conn.run_one_step() # HELLO + conn.run_one_step() # PEER-ID + + self.assertEqual(len(conn.proto1.peer.info.entrypoints), 1) + self.assertEqual(len(conn.proto2.peer.info.entrypoints), 1) + self.assertEqual(next(iter(conn.proto1.peer.info.entrypoints)).host, '192.168.1.1') + self.assertEqual(next(iter(conn.proto2.peer.info.entrypoints)).host, '192.168.1.1') + + def test_hello_with_ipv6_capability(self) -> None: + """Tests the connection between peers with the IPV6 capability. + Expected behavior: the entrypoint with IPV6 is relayed. + """ + network = 'testnet' + manager1 = self.create_peer( + network, + peer=self.peer1, + capabilities=[self._settings.CAPABILITY_IPV6, self._settings.CAPABILITY_SYNC_VERSION] + ) + manager2 = self.create_peer( + network, + peer=self.peer2, + capabilities=[self._settings.CAPABILITY_IPV6, self._settings.CAPABILITY_SYNC_VERSION] + ) + + port1 = FakeConnection._get_port(manager1) + port2 = FakeConnection._get_port(manager2) + + addr1 = IPv4Address('TCP', '192.168.1.1', port1) + addr2 = IPv4Address('TCP', '192.168.1.1', port2) + + entrypoint_1_ipv6 = PeerEndpoint.parse('tcp://[::1]:54321') + entrypoint_1_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port1}') + entrypoint_2_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port2}') + + self.peer1.info.entrypoints.add(entrypoint_1_ipv6.addr) + self.peer1.info.entrypoints.add(entrypoint_1_ipv4.addr) + self.peer2.info.entrypoints.add(entrypoint_2_ipv4.addr) + + conn = FakeConnection(manager1, manager2, addr1=addr1, addr2=addr2) + + conn.run_one_step() # HELLO + conn.run_one_step() # PEER-ID + + self.assertEqual(len(conn.proto1.peer.info.entrypoints), 1) + self.assertEqual(len(conn.proto2.peer.info.entrypoints), 2) + self.assertTrue('::1' in map(lambda x: x.host, conn.proto2.peer.info.entrypoints)) + self.assertTrue('192.168.1.1' in map(lambda x: x.host, conn.proto2.peer.info.entrypoints)) + self.assertEqual(next(iter(conn.proto1.peer.info.entrypoints)).host, '192.168.1.1') + def test_invalid_same_peer_id(self) -> None: manager3 = self.create_peer(self.network, peer=self.peer1) conn = FakeConnection(self.manager1, manager3) diff --git a/tests/poa/test_poa_simulation.py b/tests/poa/test_poa_simulation.py index b0b787f6e..096a93fd1 100644 --- a/tests/poa/test_poa_simulation.py +++ b/tests/poa/test_poa_simulation.py @@ -30,6 +30,7 @@ from hathor.crypto.util import get_address_b58_from_public_key_bytes, get_public_key_bytes_compressed from hathor.manager import HathorManager from hathor.simulator import FakeConnection +from hathor.simulator.trigger import StopWhenTrue from hathor.transaction import BaseTransaction, Block, TxInput, TxOutput from hathor.transaction.genesis import generate_new_genesis from hathor.transaction.poa import PoaBlock @@ -156,8 +157,8 @@ def test_two_producers(self) -> None: connection = FakeConnection(manager1, manager2) self.simulator.add_connection(connection) - # both managers are producing blocks - self.simulator.run(100) + trigger = StopWhenTrue(lambda: manager2.tx_storage.get_block_count() == 12) + assert self.simulator.run(200, trigger=trigger) assert manager1.tx_storage.get_block_count() == 12 assert manager2.tx_storage.get_block_count() == 12 assert manager1.tx_storage.get_best_block_tips() == manager2.tx_storage.get_best_block_tips() diff --git a/tests/resources/p2p/test_status.py b/tests/resources/p2p/test_status.py index 646ba6903..e7c322f74 100644 --- a/tests/resources/p2p/test_status.py +++ b/tests/resources/p2p/test_status.py @@ -18,13 +18,13 @@ def setUp(self): super().setUp() self.web = StubSite(StatusResource(self.manager)) address1 = IPv4Address('TCP', '192.168.1.1', 54321) - self.manager.connections.my_peer.info.entrypoints.append(PeerAddress.from_address(address1)) + self.manager.connections.my_peer.info.entrypoints.add(PeerAddress.from_address(address1)) self.manager.peers_whitelist.append(self.get_random_peer_from_pool().id) self.manager.peers_whitelist.append(self.get_random_peer_from_pool().id) self.manager2 = self.create_peer('testnet') address2 = IPv4Address('TCP', '192.168.1.1', 54322) - self.manager2.connections.my_peer.info.entrypoints.append(PeerAddress.from_address(address2)) + self.manager2.connections.my_peer.info.entrypoints.add(PeerAddress.from_address(address2)) self.conn1 = FakeConnection(self.manager, self.manager2, addr1=address1, addr2=address2) @inlineCallbacks diff --git a/tests/unittest.py b/tests/unittest.py index 94cef1c34..4a659f6bd 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -226,7 +226,9 @@ def create_peer( # type: ignore[no-untyped-def] pubsub: PubSubManager | None = None, event_storage: EventStorage | None = None, enable_event_queue: bool | None = None, - use_memory_storage: bool | None = None + use_memory_storage: bool | None = None, + enable_ipv6: bool = False, + disable_ipv4: bool = False, ): # TODO: Add -> HathorManager here. It breaks the lint in a lot of places. enable_sync_v1, enable_sync_v2 = self._syncVersionFlags(enable_sync_v1, enable_sync_v2) @@ -290,6 +292,15 @@ def create_peer( # type: ignore[no-untyped-def] if utxo_index: builder.enable_utxo_index() + if capabilities is not None: + builder.set_capabilities(capabilities) + + if enable_ipv6: + builder.enable_ipv6() + + if disable_ipv4: + builder.disable_ipv4() + daa = DifficultyAdjustmentAlgorithm(settings=self._settings, test_mode=TestMode.TEST_ALL_WEIGHT) builder.set_daa(daa) manager = self.create_peer_from_builder(builder, start_manager=start_manager)