diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index a5954880e..137cbd2f8 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -30,6 +30,7 @@ from hathor.p2p.peer_id import PeerId from hathor.pubsub import PubSubManager from hathor.storage import RocksDBStorage +from hathor.stratum import StratumFactory from hathor.transaction.storage import TransactionMemoryStorage, TransactionRocksDBStorage, TransactionStorage from hathor.util import Random, Reactor, get_environment_info from hathor.wallet import BaseWallet, Wallet @@ -56,6 +57,7 @@ class BuildArtifacts(NamedTuple): indexes: Optional[IndexesManager] wallet: Optional[BaseWallet] rocksdb_storage: Optional[RocksDBStorage] + stratum_factory: Optional[StratumFactory] class Builder: @@ -109,7 +111,7 @@ def __init__(self) -> None: self._enable_sync_v1: Optional[bool] = None self._enable_sync_v2: Optional[bool] = None - self._stratum_port: Optional[int] = None + self._enable_stratum_server: Optional[bool] = None self._full_verification: Optional[bool] = None @@ -152,9 +154,6 @@ def build(self) -> BuildArtifacts: if self._enable_sync_v2 is not None: kwargs['enable_sync_v2'] = self._enable_sync_v2 - if self._stratum_port is not None: - kwargs['stratum_port'] = self._stratum_port - if self._network is None: raise TypeError('you must set a network') @@ -180,6 +179,10 @@ def build(self) -> BuildArtifacts: **kwargs ) + stratum_factory: Optional[StratumFactory] = None + if self._enable_stratum_server: + stratum_factory = self._create_stratum_server(manager) + self.artifacts = BuildArtifacts( peer_id=peer_id, settings=settings, @@ -193,6 +196,7 @@ def build(self) -> BuildArtifacts: indexes=indexes, wallet=wallet, rocksdb_storage=self._rocksdb_storage, + stratum_factory=stratum_factory, ) return self.artifacts @@ -252,6 +256,12 @@ def _get_or_create_pubsub(self) -> PubSubManager: self._pubsub = PubSubManager(self._get_reactor()) return self._pubsub + def _create_stratum_server(self, manager: HathorManager) -> StratumFactory: + stratum_factory = StratumFactory(manager=manager) + manager.stratum_factory = stratum_factory + manager.metrics.stratum_factory = stratum_factory + return stratum_factory + def _get_or_create_rocksdb_storage(self) -> RocksDBStorage: assert self._rocksdb_path is not None @@ -363,9 +373,9 @@ def enable_keypair_wallet(self, directory: str, *, unlock: Optional[bytes] = Non self._wallet_unlock = unlock return self - def enable_stratum_server(self, port: int) -> 'Builder': + def enable_stratum_server(self) -> 'Builder': self.check_if_can_modify() - self._stratum_port = port + self._enable_stratum_server = True return self def enable_address_index(self) -> 'Builder': diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 39ff6b68b..25628f59a 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -31,6 +31,7 @@ from hathor.p2p.peer_id import PeerId from hathor.p2p.utils import discover_hostname from hathor.pubsub import PubSubManager +from hathor.stratum import StratumFactory from hathor.wallet import BaseWallet, HDWallet, Wallet logger = get_logger() @@ -176,7 +177,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa tx_storage=tx_storage, event_manager=event_manager, wallet=self.wallet, - stratum_port=args.stratum, ssl=True, checkpoints=settings.CHECKPOINTS, enable_sync_v1=enable_sync_v1, @@ -188,6 +188,11 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa enable_event_queue=bool(args.x_enable_event_queue) ) + if args.stratum: + stratum_factory = StratumFactory(self.manager) + self.manager.stratum_factory = stratum_factory + self.manager.metrics.stratum_factory = stratum_factory + if args.data: self.manager.set_cmd_path(args.data) diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index c8ea2af04..fc0bf640f 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -137,9 +137,14 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None: except BuilderError as err: self.log.error(str(err)) sys.exit(2) + self.tx_storage = self.manager.tx_storage self.wallet = self.manager.wallet self.start_manager(args) + + if args.stratum: + self.reactor.listenTCP(args.stratum, self.manager.stratum_factory) + if register_resources: resources_builder = ResourcesBuilder(self.manager, builder.event_ws_factory) status_server = resources_builder.build(args) @@ -163,6 +168,7 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None: indexes=self.manager.tx_storage.indexes, wallet=self.manager.wallet, rocksdb_storage=getattr(builder, 'rocksdb_storage', None), + stratum_factory=self.manager.stratum_factory, ) def start_sentry_if_possible(self, args: Namespace) -> None: diff --git a/hathor/manager.py b/hathor/manager.py index b608f4386..0479bf447 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -45,6 +45,7 @@ from hathor.p2p.protocol import HathorProtocol from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager +from hathor.stratum import StratumFactory from hathor.transaction import BaseTransaction, Block, MergeMinedBlock, Transaction, TxVersion, sum_weights from hathor.transaction.exceptions import TxValidationError from hathor.transaction.storage import TransactionStorage @@ -91,7 +92,6 @@ def __init__(self, network: str, hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, - stratum_port: Optional[int] = None, ssl: bool = True, enable_sync_v1: bool = False, enable_sync_v1_1: bool = True, @@ -113,9 +113,6 @@ def __init__(self, :param tx_storage: Required storage backend. :type tx_storage: :py:class:`hathor.transaction.storage.transaction_storage.TransactionStorage` - - :param stratum_port: Stratum server port. Stratum server will only be created if it is not None. - :type stratum_port: Optional[int] """ from hathor.metrics import Metrics from hathor.p2p.factory import HathorClientFactory, HathorServerFactory @@ -210,14 +207,9 @@ def __init__(self, self.wallet.pubsub = self.pubsub self.wallet.reactor = self.reactor - if stratum_port: - # XXX: only import if needed - from hathor.stratum import StratumFactory - self.stratum_factory: Optional[StratumFactory] = StratumFactory(manager=self, port=stratum_port) - else: - self.stratum_factory = None - # Set stratum factory for metrics object - self.metrics.stratum_factory = self.stratum_factory + # It will be inject later by the builder. + # XXX Remove this attribute after all dependencies are cleared. + self.stratum_factory: Optional[StratumFactory] = None self._allow_mining_without_peers = False diff --git a/hathor/stratum/stratum.py b/hathor/stratum/stratum.py index f50992d6f..c7acb0c6a 100644 --- a/hathor/stratum/stratum.py +++ b/hathor/stratum/stratum.py @@ -726,16 +726,14 @@ class StratumFactory(Factory): jobs: Set[UUID] manager: 'HathorManager' miner_protocols: Dict[UUID, StratumProtocol] - port: int tx_queue: List[bytes] mining_tx_pool: Dict[bytes, BaseTransaction] mined_txs: Dict[bytes, Transaction] deferreds_tx: Dict[bytes, Deferred] - def __init__(self, manager: 'HathorManager', port: int, reactor: Reactor = reactor): + def __init__(self, manager: 'HathorManager', reactor: Reactor = reactor): self.log = logger.new() self.manager = manager - self.port = port self.reactor = reactor self.jobs = set() @@ -769,13 +767,9 @@ def on_new_block(event: HathorEvents, args: EventArguments) -> None: self.update_jobs() self.manager.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, on_new_block) - # XXX: self.reactor is IReactorTime, which does not guarantee listenTCP method, normally it will have that - # method, but on tests we use a Clock instead, which does not have listenTCP, there shouldn't be any - # issues using the "default" reactor though - self._listen = reactor.listenTCP(self.port, self) def stop(self) -> Optional[Deferred]: - return self._listen.stopListening() + return None def mine_transaction(self, tx: Transaction, deferred: Deferred) -> None: """ diff --git a/tests/others/test_init_manager.py b/tests/others/test_init_manager.py index bdeb5f4e0..f6618b24e 100644 --- a/tests/others/test_init_manager.py +++ b/tests/others/test_init_manager.py @@ -61,7 +61,7 @@ def test_invalid_arguments(self): def tests_init_with_stratum(self): builder = TestBuilder() builder.set_tx_storage(self.tx_storage) - builder.enable_stratum_server(50505) + builder.enable_stratum_server() artifacts = builder.build() manager = artifacts.manager manager.start() diff --git a/tests/resources/test_stratum.py b/tests/resources/test_stratum.py index ee47bd35c..e73056a55 100644 --- a/tests/resources/test_stratum.py +++ b/tests/resources/test_stratum.py @@ -10,11 +10,6 @@ class StratumResourceTest(_BaseResourceTest._ResourceTest): - def _manager_kwargs(self): - kwargs = super()._manager_kwargs() - kwargs['stratum_port'] = 8123 - return kwargs - def setUp(self): super().setUp() self.web = StubSite(MiningStatsResource(self.manager)) diff --git a/tests/tx/test_stratum.py b/tests/tx/test_stratum.py index 0769c10af..42d5082df 100644 --- a/tests/tx/test_stratum.py +++ b/tests/tx/test_stratum.py @@ -38,8 +38,7 @@ def setUp(self): super().setUp() self.manager = self.create_peer('testnet') self.manager.allow_mining_without_peers() - port = self.rng.randint(8000, 9000) - self.factory = StratumFactory(self.manager, port=port, reactor=MemoryReactorHeapClock()) + self.factory = StratumFactory(self.manager, reactor=MemoryReactorHeapClock()) self.factory.start() self.protocol = self.factory.buildProtocol('127.0.0.1') self.transport = StringTransportWithDisconnection()