Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from hathor.p2p.peer_id import PeerId
from hathor.pubsub import PubSubManager
from hathor.storage import RocksDBStorage
from hathor.stratum import StratumFactory
from hathor.transaction.storage import TransactionMemoryStorage, TransactionRocksDBStorage, TransactionStorage
from hathor.util import Random, Reactor, get_environment_info
from hathor.wallet import BaseWallet, Wallet
Expand All @@ -56,6 +57,7 @@ class BuildArtifacts(NamedTuple):
indexes: Optional[IndexesManager]
wallet: Optional[BaseWallet]
rocksdb_storage: Optional[RocksDBStorage]
stratum_factory: Optional[StratumFactory]


class Builder:
Expand Down Expand Up @@ -109,7 +111,7 @@ def __init__(self) -> None:
self._enable_sync_v1: Optional[bool] = None
self._enable_sync_v2: Optional[bool] = None

self._stratum_port: Optional[int] = None
self._enable_stratum_server: Optional[bool] = None

self._full_verification: Optional[bool] = None

Expand Down Expand Up @@ -152,9 +154,6 @@ def build(self) -> BuildArtifacts:
if self._enable_sync_v2 is not None:
kwargs['enable_sync_v2'] = self._enable_sync_v2

if self._stratum_port is not None:
kwargs['stratum_port'] = self._stratum_port

if self._network is None:
raise TypeError('you must set a network')

Expand All @@ -180,6 +179,10 @@ def build(self) -> BuildArtifacts:
**kwargs
)

stratum_factory: Optional[StratumFactory] = None
if self._enable_stratum_server:
stratum_factory = self._create_stratum_server(manager)

self.artifacts = BuildArtifacts(
peer_id=peer_id,
settings=settings,
Expand All @@ -193,6 +196,7 @@ def build(self) -> BuildArtifacts:
indexes=indexes,
wallet=wallet,
rocksdb_storage=self._rocksdb_storage,
stratum_factory=stratum_factory,
)

return self.artifacts
Expand Down Expand Up @@ -252,6 +256,12 @@ def _get_or_create_pubsub(self) -> PubSubManager:
self._pubsub = PubSubManager(self._get_reactor())
return self._pubsub

def _create_stratum_server(self, manager: HathorManager) -> StratumFactory:
stratum_factory = StratumFactory(manager=manager)
manager.stratum_factory = stratum_factory
manager.metrics.stratum_factory = stratum_factory
return stratum_factory

def _get_or_create_rocksdb_storage(self) -> RocksDBStorage:
assert self._rocksdb_path is not None

Expand Down Expand Up @@ -363,9 +373,9 @@ def enable_keypair_wallet(self, directory: str, *, unlock: Optional[bytes] = Non
self._wallet_unlock = unlock
return self

def enable_stratum_server(self, port: int) -> 'Builder':
def enable_stratum_server(self) -> 'Builder':
self.check_if_can_modify()
self._stratum_port = port
self._enable_stratum_server = True
return self

def enable_address_index(self) -> 'Builder':
Expand Down
7 changes: 6 additions & 1 deletion hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from hathor.p2p.peer_id import PeerId
from hathor.p2p.utils import discover_hostname
from hathor.pubsub import PubSubManager
from hathor.stratum import StratumFactory
from hathor.wallet import BaseWallet, HDWallet, Wallet

logger = get_logger()
Expand Down Expand Up @@ -176,7 +177,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
tx_storage=tx_storage,
event_manager=event_manager,
wallet=self.wallet,
stratum_port=args.stratum,
ssl=True,
checkpoints=settings.CHECKPOINTS,
enable_sync_v1=enable_sync_v1,
Expand All @@ -188,6 +188,11 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
enable_event_queue=bool(args.x_enable_event_queue)
)

if args.stratum:
stratum_factory = StratumFactory(self.manager)
self.manager.stratum_factory = stratum_factory
self.manager.metrics.stratum_factory = stratum_factory

if args.data:
self.manager.set_cmd_path(args.data)

Expand Down
6 changes: 6 additions & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,14 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None:
except BuilderError as err:
self.log.error(str(err))
sys.exit(2)

self.tx_storage = self.manager.tx_storage
self.wallet = self.manager.wallet
self.start_manager(args)

if args.stratum:
self.reactor.listenTCP(args.stratum, self.manager.stratum_factory)

if register_resources:
resources_builder = ResourcesBuilder(self.manager, builder.event_ws_factory)
status_server = resources_builder.build(args)
Expand All @@ -163,6 +168,7 @@ def prepare(self, args: Namespace, *, register_resources: bool = True) -> None:
indexes=self.manager.tx_storage.indexes,
wallet=self.manager.wallet,
rocksdb_storage=getattr(builder, 'rocksdb_storage', None),
stratum_factory=self.manager.stratum_factory,
)

def start_sentry_if_possible(self, args: Namespace) -> None:
Expand Down
16 changes: 4 additions & 12 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from hathor.p2p.protocol import HathorProtocol
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.stratum import StratumFactory
from hathor.transaction import BaseTransaction, Block, MergeMinedBlock, Transaction, TxVersion, sum_weights
from hathor.transaction.exceptions import TxValidationError
from hathor.transaction.storage import TransactionStorage
Expand Down Expand Up @@ -91,7 +92,6 @@ def __init__(self,
network: str,
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
stratum_port: Optional[int] = None,
ssl: bool = True,
enable_sync_v1: bool = False,
enable_sync_v1_1: bool = True,
Expand All @@ -113,9 +113,6 @@ def __init__(self,

:param tx_storage: Required storage backend.
:type tx_storage: :py:class:`hathor.transaction.storage.transaction_storage.TransactionStorage`

:param stratum_port: Stratum server port. Stratum server will only be created if it is not None.
:type stratum_port: Optional[int]
"""
from hathor.metrics import Metrics
from hathor.p2p.factory import HathorClientFactory, HathorServerFactory
Expand Down Expand Up @@ -210,14 +207,9 @@ def __init__(self,
self.wallet.pubsub = self.pubsub
self.wallet.reactor = self.reactor

if stratum_port:
# XXX: only import if needed
from hathor.stratum import StratumFactory
self.stratum_factory: Optional[StratumFactory] = StratumFactory(manager=self, port=stratum_port)
else:
self.stratum_factory = None
# Set stratum factory for metrics object
self.metrics.stratum_factory = self.stratum_factory
# It will be inject later by the builder.
# XXX Remove this attribute after all dependencies are cleared.
self.stratum_factory: Optional[StratumFactory] = None

self._allow_mining_without_peers = False

Expand Down
10 changes: 2 additions & 8 deletions hathor/stratum/stratum.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,16 +726,14 @@ class StratumFactory(Factory):
jobs: Set[UUID]
manager: 'HathorManager'
miner_protocols: Dict[UUID, StratumProtocol]
port: int
tx_queue: List[bytes]
mining_tx_pool: Dict[bytes, BaseTransaction]
mined_txs: Dict[bytes, Transaction]
deferreds_tx: Dict[bytes, Deferred]

def __init__(self, manager: 'HathorManager', port: int, reactor: Reactor = reactor):
def __init__(self, manager: 'HathorManager', reactor: Reactor = reactor):
self.log = logger.new()
self.manager = manager
self.port = port
self.reactor = reactor

self.jobs = set()
Expand Down Expand Up @@ -769,13 +767,9 @@ def on_new_block(event: HathorEvents, args: EventArguments) -> None:
self.update_jobs()

self.manager.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, on_new_block)
# XXX: self.reactor is IReactorTime, which does not guarantee listenTCP method, normally it will have that
# method, but on tests we use a Clock instead, which does not have listenTCP, there shouldn't be any
# issues using the "default" reactor though
self._listen = reactor.listenTCP(self.port, self)

def stop(self) -> Optional[Deferred]:
return self._listen.stopListening()
return None

def mine_transaction(self, tx: Transaction, deferred: Deferred) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/others/test_init_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_invalid_arguments(self):
def tests_init_with_stratum(self):
builder = TestBuilder()
builder.set_tx_storage(self.tx_storage)
builder.enable_stratum_server(50505)
builder.enable_stratum_server()
artifacts = builder.build()
manager = artifacts.manager
manager.start()
Expand Down
5 changes: 0 additions & 5 deletions tests/resources/test_stratum.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@


class StratumResourceTest(_BaseResourceTest._ResourceTest):
def _manager_kwargs(self):
kwargs = super()._manager_kwargs()
kwargs['stratum_port'] = 8123
return kwargs

def setUp(self):
super().setUp()
self.web = StubSite(MiningStatsResource(self.manager))
Expand Down
3 changes: 1 addition & 2 deletions tests/tx/test_stratum.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ def setUp(self):
super().setUp()
self.manager = self.create_peer('testnet')
self.manager.allow_mining_without_peers()
port = self.rng.randint(8000, 9000)
self.factory = StratumFactory(self.manager, port=port, reactor=MemoryReactorHeapClock())
self.factory = StratumFactory(self.manager, reactor=MemoryReactorHeapClock())
self.factory.start()
self.protocol = self.factory.buildProtocol('127.0.0.1')
self.transport = StringTransportWithDisconnection()
Expand Down