Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ def __init__(self) -> None:
self._enable_tokens_index: bool = False
self._enable_utxo_index: bool = False

self._enable_sync_v1: Optional[bool] = None
self._enable_sync_v2: Optional[bool] = None
self._enable_sync_v1: bool = False
self._enable_sync_v1_1: bool = True
self._enable_sync_v2: bool = False

self._enable_stratum_server: Optional[bool] = None

Expand All @@ -121,6 +122,9 @@ def build(self) -> BuildArtifacts:
if self.artifacts is not None:
raise ValueError('cannot call build twice')

if self._network is None:
raise TypeError('you must set a network')

settings = self._get_settings()
reactor = self._get_reactor()
pubsub = self._get_or_create_pubsub()
Expand All @@ -130,6 +134,8 @@ def build(self) -> BuildArtifacts:
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)

p2p_manager = self._get_p2p_manager()

wallet = self._get_or_create_wallet()
event_manager = self._get_or_create_event_manager()
tx_storage = self._get_or_create_tx_storage()
Expand All @@ -147,16 +153,6 @@ def build(self) -> BuildArtifacts:

kwargs: Dict[str, Any] = {}

if self._enable_sync_v1 is not None:
# XXX: the interface of the Builder was kept using v1 instead of v1_1 to minimize the changes needed
kwargs['enable_sync_v1_1'] = self._enable_sync_v1

if self._enable_sync_v2 is not None:
kwargs['enable_sync_v2'] = self._enable_sync_v2

if self._network is None:
raise TypeError('you must set a network')

if self._full_verification is not None:
kwargs['full_verification'] = self._full_verification

Expand All @@ -165,12 +161,13 @@ def build(self) -> BuildArtifacts:

manager = HathorManager(
reactor,
network=self._network,
pubsub=pubsub,
consensus_algorithm=consensus_algorithm,
peer_id=peer_id,
tx_storage=tx_storage,
p2p_manager=p2p_manager,
event_manager=event_manager,
network=self._network,
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
Expand All @@ -179,6 +176,8 @@ def build(self) -> BuildArtifacts:
**kwargs
)

p2p_manager.set_manager(manager)

stratum_factory: Optional[StratumFactory] = None
if self._enable_stratum_server:
stratum_factory = self._create_stratum_server(manager)
Expand All @@ -189,7 +188,7 @@ def build(self) -> BuildArtifacts:
rng=self._rng,
reactor=reactor,
manager=manager,
p2p_manager=manager.connections,
p2p_manager=p2p_manager,
pubsub=pubsub,
consensus=consensus_algorithm,
tx_storage=tx_storage,
Expand Down Expand Up @@ -279,6 +278,27 @@ def _get_or_create_rocksdb_storage(self) -> RocksDBStorage:

return self._rocksdb_storage

def _get_p2p_manager(self) -> ConnectionsManager:
enable_ssl = True
reactor = self._get_reactor()
my_peer = self._get_peer_id()

assert self._network is not None

p2p_manager = ConnectionsManager(
reactor,
network=self._network,
my_peer=my_peer,
pubsub=self._get_or_create_pubsub(),
ssl=enable_ssl,
whitelist_only=False,
rng=self._rng,
enable_sync_v1=self._enable_sync_v1,
enable_sync_v1_1=self._enable_sync_v1_1,
enable_sync_v2=self._enable_sync_v2,
)
return p2p_manager

def _get_or_create_tx_storage(self) -> TransactionStorage:
if self._tx_storage is not None:
return self._tx_storage
Expand Down Expand Up @@ -435,6 +455,11 @@ def set_enable_sync_v1(self, enable_sync_v1: bool) -> 'Builder':
self._enable_sync_v1 = enable_sync_v1
return self

def set_enable_sync_v1_1(self, enable_sync_v1_1: bool) -> 'Builder':
self.check_if_can_modify()
self._enable_sync_v1_1 = enable_sync_v1_1
return self

def set_enable_sync_v2(self, enable_sync_v2: bool) -> 'Builder':
self.check_if_can_modify()
self._enable_sync_v2 = enable_sync_v2
Expand All @@ -450,6 +475,16 @@ def disable_sync_v1(self) -> 'Builder':
self._enable_sync_v1 = False
return self

def enable_sync_v1_1(self) -> 'Builder':
self.check_if_can_modify()
self._enable_sync_v1_1 = True
return self

def disable_sync_v1_1(self) -> 'Builder':
self.check_if_can_modify()
self._enable_sync_v1_1 = False
return self

def enable_sync_v2(self) -> 'Builder':
self.check_if_can_modify()
self._enable_sync_v2 = True
Expand Down
28 changes: 21 additions & 7 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
from hathor.exception import BuilderError
from hathor.indexes import IndexesManager
from hathor.manager import HathorManager
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer_id import PeerId
from hathor.p2p.utils import discover_hostname
from hathor.pubsub import PubSubManager
from hathor.stratum import StratumFactory
from hathor.util import Random
from hathor.wallet import BaseWallet, HDWallet, Wallet

logger = get_logger()
Expand Down Expand Up @@ -176,26 +178,38 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

self.manager = HathorManager(
p2p_manager = ConnectionsManager(
reactor,
network=network,
my_peer=peer_id,
pubsub=pubsub,
peer_id=peer_id,
ssl=True,
whitelist_only=False,
rng=Random(),
enable_sync_v1=enable_sync_v1,
enable_sync_v1_1=enable_sync_v1_1,
enable_sync_v2=enable_sync_v2,
)

self.manager = HathorManager(
reactor,
network=network,
hostname=hostname,
pubsub=pubsub,
consensus_algorithm=consensus_algorithm,
peer_id=peer_id,
tx_storage=tx_storage,
p2p_manager=p2p_manager,
event_manager=event_manager,
wallet=self.wallet,
ssl=True,
checkpoints=settings.CHECKPOINTS,
enable_sync_v1=enable_sync_v1,
enable_sync_v1_1=enable_sync_v1_1,
enable_sync_v2=enable_sync_v2,
consensus_algorithm=consensus_algorithm,
environment_info=get_environment_info(args=str(args), peer_id=peer_id.id),
full_verification=full_verification,
enable_event_queue=args.x_enable_event_queue
)

p2p_manager.set_manager(self.manager)

if args.stratum:
stratum_factory = StratumFactory(self.manager)
self.manager.stratum_factory = stratum_factory
Expand Down
31 changes: 3 additions & 28 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
SpendingVoidedError,
)
from hathor.mining import BlockTemplate, BlockTemplates
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
Expand Down Expand Up @@ -88,14 +89,11 @@ def __init__(self,
consensus_algorithm: ConsensusAlgorithm,
peer_id: PeerId,
tx_storage: TransactionStorage,
p2p_manager: ConnectionsManager,
event_manager: EventManager,
network: str,
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
ssl: bool = True,
enable_sync_v1: bool = False,
enable_sync_v1_1: bool = True,
enable_sync_v2: bool = False,
capabilities: Optional[List[str]] = None,
checkpoints: Optional[List[Checkpoint]] = None,
rng: Optional[Random] = None,
Expand All @@ -108,28 +106,17 @@ def __init__(self,
:param network: Name of the network this node participates. Usually it is either testnet or mainnet.
:type network: string

:param hostname: The hostname of this node. It is used to generate its entrypoints.
:type hostname: string

:param tx_storage: Required storage backend.
:type tx_storage: :py:class:`hathor.transaction.storage.transaction_storage.TransactionStorage`
"""
from hathor.metrics import Metrics
from hathor.p2p.factory import HathorClientFactory, HathorServerFactory
from hathor.p2p.manager import ConnectionsManager

if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2):
raise TypeError(f'{type(self).__name__}() at least one sync version is required')

if event_manager.get_event_queue_state() is True and not enable_event_queue:
raise InitializationError(
'Cannot start manager without event queue feature, as it was enabled in the previous startup. '
'Either enable it, or use the reset-event-queue CLI command to remove all event-related data'
)

self._enable_sync_v1 = enable_sync_v1
self._enable_sync_v2 = enable_sync_v2

self._cmd_path: Optional[str] = None

self.log = logger.new()
Expand Down Expand Up @@ -177,23 +164,11 @@ def __init__(self,
self._event_manager.save_event_queue_state(enable_event_queue)
self._enable_event_queue = enable_event_queue

if enable_sync_v2:
assert self.tx_storage.indexes is not None
self.log.debug('enable sync-v2 indexes')
self.tx_storage.indexes.enable_deps_index()
self.tx_storage.indexes.enable_mempool_index()

self.consensus_algorithm = consensus_algorithm

self.peer_discoveries: List[PeerDiscovery] = []

self.ssl = ssl
self.server_factory = HathorServerFactory(self.network, self.my_peer, node=self, use_ssl=ssl)
self.client_factory = HathorClientFactory(self.network, self.my_peer, node=self, use_ssl=ssl)
self.connections = ConnectionsManager(self.reactor, self.my_peer, self.server_factory, self.client_factory,
self.pubsub, self, ssl, whitelist_only=False, rng=self.rng,
enable_sync_v1=enable_sync_v1, enable_sync_v2=enable_sync_v2,
enable_sync_v1_1=enable_sync_v1_1)
self.connections = p2p_manager

self.metrics = Metrics(
pubsub=self.pubsub,
Expand Down
23 changes: 8 additions & 15 deletions hathor/p2p/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,28 @@ class HathorServerFactory(protocol.ServerFactory):
"""

manager: Optional[ConnectionsManager]
protocol: Optional[Type[MyServerProtocol]] = MyServerProtocol
protocol: Type[MyServerProtocol] = MyServerProtocol

def __init__(
self,
network: str,
my_peer: PeerId,
connections: Optional[ConnectionsManager] = None,
p2p_manager: ConnectionsManager,
*,
node: 'HathorManager',
use_ssl: bool,
):
super().__init__()
self.network = network
self.my_peer = my_peer
self.connections = connections
self.node = node
self.p2p_manager = p2p_manager
self.use_ssl = use_ssl

def buildProtocol(self, addr: IAddress) -> MyServerProtocol:
assert self.protocol is not None
p = self.protocol(
network=self.network,
my_peer=self.my_peer,
connections=self.connections,
node=self.node,
p2p_manager=self.p2p_manager,
use_ssl=self.use_ssl,
inbound=True,
)
Expand All @@ -69,32 +66,28 @@ class HathorClientFactory(protocol.ClientFactory):
""" HathorClientFactory is used to generate HathorProtocol objects when we connected to another peer.
"""

manager: Optional[ConnectionsManager]
protocol: Optional[Type[MyClientProtocol]] = MyClientProtocol
protocol: Type[MyClientProtocol] = MyClientProtocol

def __init__(
self,
network: str,
my_peer: PeerId,
connections: Optional[ConnectionsManager] = None,
p2p_manager: ConnectionsManager,
*,
node: 'HathorManager',
use_ssl: bool,
):
super().__init__()
self.network = network
self.my_peer = my_peer
self.connections = connections
self.node = node
self.p2p_manager = p2p_manager
self.use_ssl = use_ssl

def buildProtocol(self, addr: IAddress) -> MyClientProtocol:
assert self.protocol is not None
p = self.protocol(
network=self.network,
my_peer=self.my_peer,
connections=self.connections,
node=self.node,
p2p_manager=self.p2p_manager,
use_ssl=self.use_ssl,
inbound=False,
)
Expand Down
Loading