Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 76 additions & 72 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import os
import platform
import sys
from argparse import Namespace
from typing import Optional
from typing import Any, Optional

from structlog import get_logger
from twisted.internet.posixbase import PosixReactorBase

from hathor.cli.run_node import RunNodeArgs
from hathor.consensus import ConsensusAlgorithm
from hathor.event import EventManager
from hathor.exception import BuilderError
Expand All @@ -44,15 +44,16 @@ class CliBuilder:

TODO Refactor to use Builder. It could even be ported to a Builder.from_args classmethod.
"""
def __init__(self) -> None:
def __init__(self, args: RunNodeArgs) -> None:
self.log = logger.new()
self._args = args

def check_or_raise(self, condition: bool, message: str) -> None:
"""Will exit printing `message` if `condition` is False."""
if not condition:
raise BuilderError(message)

def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorManager:
def create_manager(self, reactor: PosixReactorBase) -> HathorManager:
import hathor
from hathor.conf import HathorSettings
from hathor.conf.get_settings import get_settings_source
Expand All @@ -79,7 +80,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
self.log = logger.new()
self.reactor = reactor

peer_id = self.create_peer_id(args)
peer_id = self.create_peer_id()

python = f'{platform.python_version()}-{platform.python_implementation()}'

Expand All @@ -100,61 +101,62 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
self.rocksdb_storage: Optional[RocksDBStorage] = None
self.event_ws_factory: Optional[EventWebsocketFactory] = None

if args.memory_storage:
self.check_or_raise(not args.data, '--data should not be used with --memory-storage')
if self._args.memory_storage:
self.check_or_raise(not self._args.data, '--data should not be used with --memory-storage')
# if using MemoryStorage, no need to have cache
indexes = MemoryIndexesManager()
tx_storage = TransactionMemoryStorage(indexes)
event_storage = EventMemoryStorage()
self.check_or_raise(not args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data')
self.check_or_raise(not self._args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data')
self.log.info('with storage', storage_class=type(tx_storage).__name__)
else:
self.check_or_raise(args.data, '--data is expected')
if args.rocksdb_storage:
self.check_or_raise(bool(self._args.data), '--data is expected')
assert self._args.data is not None
if self._args.rocksdb_storage:
self.log.warn('--rocksdb-storage is now implied, no need to specify it')
cache_capacity = args.rocksdb_cache
self.rocksdb_storage = RocksDBStorage(path=args.data, cache_capacity=cache_capacity)
cache_capacity = self._args.rocksdb_cache
self.rocksdb_storage = RocksDBStorage(path=self._args.data, cache_capacity=cache_capacity)

# Initialize indexes manager.
if args.memory_indexes:
if self._args.memory_indexes:
indexes = MemoryIndexesManager()
else:
indexes = RocksDBIndexesManager(self.rocksdb_storage)

kwargs = {}
if not args.cache:
if not self._args.cache:
# We should only pass indexes if cache is disabled. Otherwise,
# only TransactionCacheStorage should have indexes.
kwargs['indexes'] = indexes
tx_storage = TransactionRocksDBStorage(self.rocksdb_storage, **kwargs)
event_storage = EventRocksDBStorage(self.rocksdb_storage)

self.log.info('with storage', storage_class=type(tx_storage).__name__, path=args.data)
if args.cache:
self.check_or_raise(not args.memory_storage, '--cache should not be used with --memory-storage')
self.log.info('with storage', storage_class=type(tx_storage).__name__, path=self._args.data)
if self._args.cache:
self.check_or_raise(not self._args.memory_storage, '--cache should not be used with --memory-storage')
tx_storage = TransactionCacheStorage(tx_storage, reactor, indexes=indexes)
if args.cache_size:
tx_storage.capacity = args.cache_size
if args.cache_interval:
tx_storage.interval = args.cache_interval
if self._args.cache_size:
tx_storage.capacity = self._args.cache_size
if self._args.cache_interval:
tx_storage.interval = self._args.cache_interval
self.log.info('with cache', capacity=tx_storage.capacity, interval=tx_storage.interval)
self.tx_storage = tx_storage
self.log.info('with indexes', indexes_class=type(tx_storage.indexes).__name__)

self.wallet = None
if args.wallet:
self.wallet = self.create_wallet(args)
self.log.info('with wallet', wallet=self.wallet, path=args.data)
if self._args.wallet:
self.wallet = self.create_wallet()
self.log.info('with wallet', wallet=self.wallet, path=self._args.data)

hostname = self.get_hostname(args)
hostname = self.get_hostname()
network = settings.NETWORK_NAME
enable_sync_v1 = args.x_enable_legacy_sync_v1_0
enable_sync_v1_1 = not args.x_sync_v2_only
enable_sync_v2 = args.x_sync_v2_only or args.x_sync_bridge
enable_sync_v1 = self._args.x_enable_legacy_sync_v1_0
enable_sync_v1_1 = not self._args.x_sync_v2_only
enable_sync_v2 = self._args.x_sync_v2_only or self._args.x_sync_bridge

pubsub = PubSubManager(reactor)

if args.x_enable_event_queue:
if self._args.x_enable_event_queue:
self.event_ws_factory = EventWebsocketFactory(reactor, event_storage)

event_manager = EventManager(
Expand All @@ -164,24 +166,26 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
reactor=reactor
)

if args.wallet_index and tx_storage.indexes is not None:
if self._args.wallet_index and tx_storage.indexes is not None:
self.log.debug('enable wallet indexes')
self.enable_wallet_index(tx_storage.indexes, pubsub)

if args.utxo_index and tx_storage.indexes is not None:
if self._args.utxo_index and tx_storage.indexes is not None:
self.log.debug('enable utxo index')
tx_storage.indexes.enable_utxo_index()

full_verification = False
if args.x_full_verification:
self.check_or_raise(not args.x_enable_event_queue, '--x-full-verification cannot be used with '
'--x-enable-event-queue')
if self._args.x_full_verification:
self.check_or_raise(
not self._args.x_enable_event_queue,
'--x-full-verification cannot be used with --x-enable-event-queue'
)
full_verification = True

soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)

if args.x_enable_event_queue:
if self._args.x_enable_event_queue:
self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

Expand Down Expand Up @@ -210,59 +214,59 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
event_manager=event_manager,
wallet=self.wallet,
checkpoints=settings.CHECKPOINTS,
environment_info=get_environment_info(args=str(args), peer_id=peer_id.id),
environment_info=get_environment_info(args=str(self._args), peer_id=peer_id.id),
full_verification=full_verification,
enable_event_queue=args.x_enable_event_queue
enable_event_queue=self._args.x_enable_event_queue
)

p2p_manager.set_manager(self.manager)

if args.stratum:
if self._args.stratum:
stratum_factory = StratumFactory(self.manager)
self.manager.stratum_factory = stratum_factory
self.manager.metrics.stratum_factory = stratum_factory

if args.data:
self.manager.set_cmd_path(args.data)
if self._args.data:
self.manager.set_cmd_path(self._args.data)

if args.allow_mining_without_peers:
if self._args.allow_mining_without_peers:
self.manager.allow_mining_without_peers()

if args.x_localhost_only:
if self._args.x_localhost_only:
self.manager.connections.localhost_only = True

dns_hosts = []
if settings.BOOTSTRAP_DNS:
dns_hosts.extend(settings.BOOTSTRAP_DNS)

if args.dns:
dns_hosts.extend(args.dns)
if self._args.dns:
dns_hosts.extend(self._args.dns)

if dns_hosts:
self.manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))

if args.bootstrap:
self.manager.add_peer_discovery(BootstrapPeerDiscovery(args.bootstrap))
if self._args.bootstrap:
self.manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap))

if args.test_mode_tx_weight:
if self._args.test_mode_tx_weight:
_set_test_mode(TestMode.TEST_TX_WEIGHT)
if self.wallet:
self.wallet.test_mode = True

if args.x_rocksdb_indexes:
if self._args.x_rocksdb_indexes:
self.log.warn('--x-rocksdb-indexes is now the default, no need to specify it')
if args.memory_indexes:
if self._args.memory_indexes:
raise BuilderError('You cannot use --memory-indexes and --x-rocksdb-indexes.')

if args.memory_indexes and args.memory_storage:
if self._args.memory_indexes and self._args.memory_storage:
self.log.warn('--memory-indexes is implied for memory storage or JSON storage')

for description in args.listen:
for description in self._args.listen:
self.manager.add_listen_address(description)

if args.peer_id_blacklist:
self.log.info('with peer id blacklist', blacklist=args.peer_id_blacklist)
add_peer_id_blacklist(args.peer_id_blacklist)
if self._args.peer_id_blacklist:
self.log.info('with peer id blacklist', blacklist=self._args.peer_id_blacklist)
add_peer_id_blacklist(self._args.peer_id_blacklist)

return self.manager

Expand All @@ -271,13 +275,13 @@ def enable_wallet_index(self, indexes: IndexesManager, pubsub: PubSubManager) ->
indexes.enable_address_index(pubsub)
indexes.enable_tokens_index()

def get_hostname(self, args: Namespace) -> str:
if args.hostname and args.auto_hostname:
def get_hostname(self) -> Optional[str]:
if self._args.hostname and self._args.auto_hostname:
print('You cannot use --hostname and --auto-hostname together.')
sys.exit(-1)

if not args.auto_hostname:
hostname = args.hostname
if not self._args.auto_hostname:
hostname = self._args.hostname
else:
print('Trying to discover your hostname...')
hostname = discover_hostname()
Expand All @@ -288,38 +292,38 @@ def get_hostname(self, args: Namespace) -> str:
print('Hostname discovered and set to {}'.format(hostname))
return hostname

def create_peer_id(self, args: Namespace) -> PeerId:
if not args.peer:
def create_peer_id(self) -> PeerId:
if not self._args.peer:
peer_id = PeerId()
else:
data = json.load(open(args.peer, 'r'))
data = json.load(open(self._args.peer, 'r'))
peer_id = PeerId.create_from_json(data)
return peer_id

def create_wallet(self, args: Namespace) -> BaseWallet:
if args.wallet == 'hd':
kwargs = {
'words': args.words,
def create_wallet(self) -> BaseWallet:
if self._args.wallet == 'hd':
kwargs: dict[str, Any] = {
'words': self._args.words,
}

if args.passphrase:
if self._args.passphrase:
wallet_passphrase = getpass.getpass(prompt='HD Wallet passphrase:')
kwargs['passphrase'] = wallet_passphrase.encode()

if args.data:
kwargs['directory'] = args.data
if self._args.data:
kwargs['directory'] = self._args.data

return HDWallet(**kwargs)
elif args.wallet == 'keypair':
elif self._args.wallet == 'keypair':
print('Using KeyPairWallet')
if args.data:
wallet = Wallet(directory=args.data)
if self._args.data:
wallet = Wallet(directory=self._args.data)
else:
wallet = Wallet()

wallet.flush_to_disk_interval = 5 # seconds

if args.unlock_wallet:
if self._args.unlock_wallet:
wallet_passwd = getpass.getpass(prompt='Wallet password:')
wallet.unlock(wallet_passwd.encode())

Expand Down
Loading