diff --git a/extras/custom_checks.sh b/extras/custom_checks.sh index b3c635d3d..ece887832 100644 --- a/extras/custom_checks.sh +++ b/extras/custom_checks.sh @@ -89,12 +89,38 @@ function check_do_not_import_tests_in_hathor() { return 0 } +function check_do_not_import_from_hathor_in_entrypoints() { + PATTERN='^import .*hathor.*\|^from .*hathor.* import' + + if grep -R "$PATTERN" "hathor/cli" | grep -v 'from hathor.cli.run_node import RunNode' | grep -v '# skip-cli-import-custom-check'; then + echo 'do not import from `hathor` in the module-level of a CLI entrypoint.' + echo 'instead, import locally inside the function that uses the import.' + echo 'alternatively, comment `# skip-cli-import-custom-check` to exclude a line.' + return 1 + fi + return 0 +} + +function check_do_not_import_twisted_reactor_directly() { + EXCLUDES="--exclude=reactor.py --exclude=conftest.py" + PATTERN='\<.*from .*twisted.internet import .*reactor\>' + + if grep -R $EXCLUDES "$PATTERN" "${SOURCE_DIRS[@]}"; then + echo 'do not use `from twisted.internet import reactor` directly.' + echo 'instead, use `hathor.reactor.get_global_reactor()`.' + return 1 + fi + return 0 +} + # List of functions to be executed checks=( check_version_match check_do_not_use_builtin_random_in_tests check_deprecated_typing check_do_not_import_tests_in_hathor + check_do_not_import_from_hathor_in_entrypoints + check_do_not_import_twisted_reactor_directly ) # Initialize a variable to track if any check fails diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 7bc00f13d..d8ee85522 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -34,6 +34,7 @@ from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer_id import PeerId from hathor.pubsub import PubSubManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.storage import RocksDBStorage from hathor.stratum import StratumFactory from hathor.transaction.storage import ( @@ -42,7 +43,7 @@ TransactionRocksDBStorage, TransactionStorage, ) -from hathor.util import Random, Reactor, get_environment_info, not_none +from hathor.util import Random, get_environment_info, not_none from hathor.verification.verification_service import VerificationService, VertexVerifiers from hathor.wallet import BaseWallet, Wallet @@ -311,7 +312,7 @@ def _get_or_create_pubsub(self) -> PubSubManager: return self._pubsub def _create_stratum_server(self, manager: HathorManager) -> StratumFactory: - stratum_factory = StratumFactory(manager=manager) + stratum_factory = StratumFactory(manager=manager, reactor=self._get_reactor()) manager.stratum_factory = stratum_factory manager.metrics.stratum_factory = stratum_factory return stratum_factory diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 0e9ea9d04..8d3463176 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -21,7 +21,7 @@ from structlog import get_logger -from hathor.cli.run_node import RunNodeArgs +from hathor.cli.run_node_args import RunNodeArgs from hathor.consensus import ConsensusAlgorithm from hathor.daa import DifficultyAdjustmentAlgorithm from hathor.event import EventManager @@ -35,8 +35,9 @@ from hathor.p2p.peer_id import PeerId from hathor.p2p.utils import discover_hostname, get_genesis_short_hash from hathor.pubsub import PubSubManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.stratum import StratumFactory -from hathor.util import Random, Reactor, not_none +from hathor.util import Random, not_none from hathor.verification.verification_service import VerificationService from hathor.verification.vertex_verifiers import VertexVerifiers from hathor.wallet import BaseWallet, HDWallet, Wallet @@ -99,6 +100,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: python=python, platform=platform.platform(), settings=settings_source, + reactor_type=type(reactor).__name__, ) tx_storage: TransactionStorage @@ -271,7 +273,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: p2p_manager.set_manager(self.manager) if self._args.stratum: - stratum_factory = StratumFactory(self.manager) + stratum_factory = StratumFactory(self.manager, reactor=reactor) self.manager.stratum_factory = stratum_factory self.manager.metrics.stratum_factory = stratum_factory diff --git a/hathor/builder/resources_builder.py b/hathor/builder/resources_builder.py index 46d46023d..ce92ccaa1 100644 --- a/hathor/builder/resources_builder.py +++ b/hathor/builder/resources_builder.py @@ -224,15 +224,15 @@ def create_resources(self) -> server.Site: root.putChild(b'_debug', debug_resource) resources.extend([ (b'log', DebugLogResource(), debug_resource), - (b'raise', DebugRaiseResource(), debug_resource), - (b'reject', DebugRejectResource(), debug_resource), + (b'raise', DebugRaiseResource(self.manager.reactor), debug_resource), + (b'reject', DebugRejectResource(self.manager.reactor), debug_resource), (b'print', DebugPrintResource(), debug_resource), ]) if self._args.enable_crash_api: crash_resource = Resource() root.putChild(b'_crash', crash_resource) resources.extend([ - (b'exit', DebugCrashResource(), crash_resource), + (b'exit', DebugCrashResource(self.manager.reactor), crash_resource), (b'mess_around', DebugMessAroundResource(self.manager), crash_resource), ]) diff --git a/hathor/cli/db_import.py b/hathor/cli/db_import.py index 84a51d185..a43132c40 100644 --- a/hathor/cli/db_import.py +++ b/hathor/cli/db_import.py @@ -18,7 +18,6 @@ from argparse import ArgumentParser, FileType from typing import TYPE_CHECKING, Iterator -from hathor.cli.db_export import MAGIC_HEADER from hathor.cli.run_node import RunNode if TYPE_CHECKING: @@ -46,6 +45,7 @@ def prepare(self, *, register_resources: bool = True) -> None: self.in_file = io.BufferedReader(self._args.import_file) def run(self) -> None: + from hathor.cli.db_export import MAGIC_HEADER from hathor.util import tx_progress header = self.in_file.read(len(MAGIC_HEADER)) diff --git a/hathor/cli/events_simulator/event_forwarding_websocket_factory.py b/hathor/cli/events_simulator/event_forwarding_websocket_factory.py index 15e5e70d1..5b1dad9b3 100644 --- a/hathor/cli/events_simulator/event_forwarding_websocket_factory.py +++ b/hathor/cli/events_simulator/event_forwarding_websocket_factory.py @@ -12,21 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any +from typing import TYPE_CHECKING, Any from twisted.internet.interfaces import IAddress -from hathor.cli.events_simulator.event_forwarding_websocket_protocol import EventForwardingWebsocketProtocol -from hathor.event.websocket import EventWebsocketFactory -from hathor.simulator import Simulator +from hathor.event.websocket import EventWebsocketFactory # skip-cli-import-custom-check + +if TYPE_CHECKING: + from hathor.cli.events_simulator.event_forwarding_websocket_protocol import EventForwardingWebsocketProtocol + from hathor.simulator import Simulator class EventForwardingWebsocketFactory(EventWebsocketFactory): - def __init__(self, simulator: Simulator, *args: Any, **kwargs: Any) -> None: + def __init__(self, simulator: 'Simulator', *args: Any, **kwargs: Any) -> None: self._simulator = simulator super().__init__(*args, **kwargs) - def buildProtocol(self, _: IAddress) -> EventForwardingWebsocketProtocol: + def buildProtocol(self, _: IAddress) -> 'EventForwardingWebsocketProtocol': protocol = EventForwardingWebsocketProtocol(self._simulator) protocol.factory = self return protocol diff --git a/hathor/cli/events_simulator/event_forwarding_websocket_protocol.py b/hathor/cli/events_simulator/event_forwarding_websocket_protocol.py index da3530572..cd6318a0b 100644 --- a/hathor/cli/events_simulator/event_forwarding_websocket_protocol.py +++ b/hathor/cli/events_simulator/event_forwarding_websocket_protocol.py @@ -16,17 +16,17 @@ from autobahn.websocket import ConnectionRequest -from hathor.event.websocket import EventWebsocketProtocol -from hathor.simulator import Simulator +from hathor.event.websocket import EventWebsocketProtocol # skip-cli-import-custom-check if TYPE_CHECKING: from hathor.cli.events_simulator.event_forwarding_websocket_factory import EventForwardingWebsocketFactory + from hathor.simulator import Simulator class EventForwardingWebsocketProtocol(EventWebsocketProtocol): factory: 'EventForwardingWebsocketFactory' - def __init__(self, simulator: Simulator) -> None: + def __init__(self, simulator: 'Simulator') -> None: self._simulator = simulator super().__init__() diff --git a/hathor/cli/events_simulator/events_simulator.py b/hathor/cli/events_simulator/events_simulator.py index f7379d7a4..600232429 100644 --- a/hathor/cli/events_simulator/events_simulator.py +++ b/hathor/cli/events_simulator/events_simulator.py @@ -44,8 +44,8 @@ def execute(args: Namespace) -> None: os.environ['HATHOR_CONFIG_YAML'] = UNITTESTS_SETTINGS_FILEPATH from hathor.cli.events_simulator.event_forwarding_websocket_factory import EventForwardingWebsocketFactory from hathor.cli.events_simulator.scenario import Scenario + from hathor.reactor import get_global_reactor from hathor.simulator import Simulator - from hathor.util import reactor try: scenario = Scenario[args.scenario] @@ -53,6 +53,7 @@ def execute(args: Namespace) -> None: possible_scenarios = [scenario.name for scenario in Scenario] raise ValueError(f'Invalid scenario "{args.scenario}". Choose one of {possible_scenarios}') from e + reactor = get_global_reactor() log = logger.new() simulator = Simulator(args.seed) simulator.start() diff --git a/hathor/cli/mining.py b/hathor/cli/mining.py index 63ab8757c..1fbd8a927 100644 --- a/hathor/cli/mining.py +++ b/hathor/cli/mining.py @@ -24,8 +24,6 @@ import requests -from hathor.conf.get_settings import get_settings - _SLEEP_ON_ERROR_SECONDS = 5 _MAX_CONN_RETRIES = math.inf @@ -137,6 +135,7 @@ def execute(args: Namespace) -> None: block.nonce, block.weight)) try: + from hathor.conf.get_settings import get_settings from hathor.daa import DifficultyAdjustmentAlgorithm from hathor.verification.verification_service import VerificationService, VertexVerifiers settings = get_settings() diff --git a/hathor/cli/multisig_spend.py b/hathor/cli/multisig_spend.py index cd9600097..6b7fcdc57 100644 --- a/hathor/cli/multisig_spend.py +++ b/hathor/cli/multisig_spend.py @@ -14,8 +14,6 @@ from argparse import ArgumentParser, Namespace -from hathor.mining.cpu_mining_service import CpuMiningService - def create_parser() -> ArgumentParser: from hathor.cli.util import create_parser @@ -29,6 +27,7 @@ def create_parser() -> ArgumentParser: def execute(args: Namespace) -> None: + from hathor.mining.cpu_mining_service import CpuMiningService from hathor.transaction import Transaction from hathor.transaction.scripts import MultiSig diff --git a/hathor/cli/nginx_config.py b/hathor/cli/nginx_config.py index 18a6f4afe..974d0f74c 100644 --- a/hathor/cli/nginx_config.py +++ b/hathor/cli/nginx_config.py @@ -17,8 +17,6 @@ from enum import Enum from typing import Any, NamedTuple, Optional, TextIO -from hathor.cli.openapi_json import get_openapi_dict - BASE_PATH = os.path.join(os.path.dirname(__file__), 'nginx_files') @@ -26,6 +24,7 @@ def get_openapi(src_file: Optional[TextIO] = None) -> dict[str, Any]: """ Open and parse the json file or generate OpenAPI dict on-the-fly """ if src_file is None: + from hathor.cli.openapi_json import get_openapi_dict return get_openapi_dict() else: return json.load(src_file) diff --git a/hathor/cli/openapi_files/register.py b/hathor/cli/openapi_files/register.py index 7ce9afac6..77dc29b87 100644 --- a/hathor/cli/openapi_files/register.py +++ b/hathor/cli/openapi_files/register.py @@ -14,7 +14,7 @@ from typing import TypeVar -from hathor.api_util import Resource +from hathor.api_util import Resource # skip-cli-import-custom-check _registered_resources: list[type[Resource]] = [] diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 40d1004ca..55b9c1730 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -15,27 +15,26 @@ import os import sys from argparse import SUPPRESS, ArgumentParser, Namespace -from typing import Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Callable, Optional from pydantic import ValidationError from structlog import get_logger -from hathor.cli.run_node_args import RunNodeArgs -from hathor.conf import TESTNET_SETTINGS_FILEPATH, HathorSettings -from hathor.exception import PreInitializationError -from hathor.feature_activation.feature import Feature - logger = get_logger() # LOGGING_CAPTURE_STDOUT = True +if TYPE_CHECKING: + from hathor.cli.run_node_args import RunNodeArgs + class RunNode: - UNSAFE_ARGUMENTS: list[tuple[str, Callable[[RunNodeArgs], bool]]] = [ + UNSAFE_ARGUMENTS: list[tuple[str, Callable[['RunNodeArgs'], bool]]] = [ ('--test-mode-tx-weight', lambda args: bool(args.test_mode_tx_weight)), ('--enable-crash-api', lambda args: bool(args.enable_crash_api)), ('--x-sync-bridge', lambda args: bool(args.x_sync_bridge)), ('--x-sync-v2-only', lambda args: bool(args.x_sync_v2_only)), - ('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)) + ('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)), + ('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor)) ] @classmethod @@ -45,6 +44,7 @@ def create_parser(cls) -> ArgumentParser: Arguments must also be added to hathor.cli.run_node_args.RunNodeArgs """ from hathor.cli.util import create_parser + from hathor.feature_activation.feature import Feature parser = create_parser() parser.add_argument('--hostname', help='Hostname used to be accessed by other peers') @@ -118,6 +118,8 @@ def create_parser(cls) -> ArgumentParser: help=f'Signal support for a feature. One of {possible_features}') parser.add_argument('--signal-not-support', default=[], action='append', choices=possible_features, help=f'Signal not support for a feature. One of {possible_features}') + parser.add_argument('--x-asyncio-reactor', action='store_true', + help='Use asyncio reactor instead of Twisted\'s default.') return parser def prepare(self, *, register_resources: bool = True) -> None: @@ -139,7 +141,8 @@ def prepare(self, *, register_resources: bool = True) -> None: self.check_unsafe_arguments() self.check_python_version() - from hathor.util import reactor + from hathor.reactor import initialize_global_reactor + reactor = initialize_global_reactor(use_asyncio_reactor=self._args.x_asyncio_reactor) self.reactor = reactor from hathor.builder import CliBuilder, ResourcesBuilder @@ -346,6 +349,9 @@ def check_python_version(self) -> None: ])) def __init__(self, *, argv=None): + from hathor.cli.run_node_args import RunNodeArgs + from hathor.conf import TESTNET_SETTINGS_FILEPATH + from hathor.conf.get_settings import get_settings self.log = logger.new() if argv is None: @@ -363,8 +369,9 @@ def __init__(self, *, argv=None): os.environ['HATHOR_CONFIG_YAML'] = TESTNET_SETTINGS_FILEPATH try: - HathorSettings() + get_settings() except (TypeError, ValidationError) as e: + from hathor.exception import PreInitializationError raise PreInitializationError( 'An error was found while trying to initialize HathorSettings. See above for details.' ) from e diff --git a/hathor/cli/run_node_args.py b/hathor/cli/run_node_args.py index bde32a6e8..897555fbb 100644 --- a/hathor/cli/run_node_args.py +++ b/hathor/cli/run_node_args.py @@ -16,8 +16,8 @@ from pydantic import Extra -from hathor.feature_activation.feature import Feature -from hathor.utils.pydantic import BaseModel +from hathor.feature_activation.feature import Feature # skip-cli-import-custom-check +from hathor.utils.pydantic import BaseModel # skip-cli-import-custom-check class RunNodeArgs(BaseModel, extra=Extra.allow): @@ -72,3 +72,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow): config_yaml: Optional[str] signal_support: set[Feature] signal_not_support: set[Feature] + x_asyncio_reactor: bool diff --git a/hathor/cli/stratum_mining.py b/hathor/cli/stratum_mining.py index 06bd53203..799a210dc 100644 --- a/hathor/cli/stratum_mining.py +++ b/hathor/cli/stratum_mining.py @@ -30,8 +30,8 @@ def create_parser() -> ArgumentParser: def execute(args: Namespace) -> None: from hathor.crypto.util import decode_address + from hathor.reactor import get_global_reactor from hathor.stratum import StratumClient - from hathor.util import reactor from hathor.wallet.exceptions import InvalidAddress address = None @@ -43,7 +43,8 @@ def execute(args: Namespace) -> None: print('The given address is invalid') sys.exit(-1) - miner = StratumClient(proc_count=args.nproc, address=address) + reactor = get_global_reactor() + miner = StratumClient(proc_count=args.nproc, address=address, reactor=reactor) miner.start() point = TCP4ClientEndpoint(reactor, args.host, args.port) connectProtocol(point, miner) diff --git a/hathor/cli/twin_tx.py b/hathor/cli/twin_tx.py index f55274368..f57c4ee97 100644 --- a/hathor/cli/twin_tx.py +++ b/hathor/cli/twin_tx.py @@ -19,8 +19,6 @@ import requests -from hathor.mining.cpu_mining_service import CpuMiningService - def create_parser() -> ArgumentParser: from hathor.cli.util import create_parser @@ -36,6 +34,7 @@ def create_parser() -> ArgumentParser: def execute(args: Namespace) -> None: + from hathor.mining.cpu_mining_service import CpuMiningService from hathor.transaction import Transaction # Get tx you want to create a twin diff --git a/hathor/debug_resources.py b/hathor/debug_resources.py index 6e050a63c..5d0707b2f 100644 --- a/hathor/debug_resources.py +++ b/hathor/debug_resources.py @@ -24,7 +24,7 @@ from hathor.cli.openapi_files.register import register_resource from hathor.exception import HathorError from hathor.manager import HathorManager -from hathor.util import reactor +from hathor.reactor import ReactorProtocol from hathor.utils.zope import asserted_cast logger = get_logger() @@ -54,6 +54,10 @@ class DebugRaiseResource(Resource): } default_msg = 'exception raised for debugging purposes' + def __init__(self, reactor: ReactorProtocol) -> None: + super().__init__() + self._reactor = reactor + def run(self, exc_cls: type[BaseException], msg: str) -> None: raise exc_cls(msg) @@ -63,7 +67,7 @@ def render_GET(self, request: Request) -> bytes: assert exc_cls_name in self.exc_class_map exc_cls = self.exc_class_map[exc_cls_name] msg = get_arg_default(raw_args, 'msg', self.default_msg) - threaded_reactor = asserted_cast(IReactorFromThreads, reactor) + threaded_reactor = asserted_cast(IReactorFromThreads, self._reactor) threaded_reactor.callFromThread(self.run, exc_cls, msg) return b'OK: no side-effects\n' @@ -188,7 +192,7 @@ def render_GET(self, request: Request) -> bytes: mess = get_arg_default(get_args(request), 'with', self.default_mess) assert mess in self.mess_map mess_func = self.mess_map[mess] - threaded_reactor = asserted_cast(IReactorFromThreads, reactor) + threaded_reactor = asserted_cast(IReactorFromThreads, self.manager.reactor) threaded_reactor.callFromThread(mess_func) return b'OK: database yanked, full-node will break\n' @@ -208,6 +212,10 @@ class DebugCrashResource(Resource): } } + def __init__(self, reactor: ReactorProtocol) -> None: + super().__init__() + self._reactor = reactor + def run(self, code: int) -> None: # XXX: sys.exit will raise a SystemExit exception that get's trapped by twisted # os._exit will bypass that by exiting directly, note that no cleanup methods will be called @@ -215,5 +223,5 @@ def run(self, code: int) -> None: def render_GET(self, request: Request) -> bytes: code = get_arg_default(get_args(request), 'code', -1) - reactor.callLater(1.0, self.run, code) + self._reactor.callLater(1.0, self.run, code) return b'OK: full-node will exit and probably break database\n' diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 7338d256e..6306707c6 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -23,8 +23,9 @@ from hathor.event.storage import EventStorage from hathor.event.websocket import EventWebsocketFactory from hathor.pubsub import EventArguments, HathorEvents, PubSubManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction -from hathor.util import Reactor, not_none, progress +from hathor.util import not_none, progress from hathor.utils.iter import batch_iterator logger = get_logger() diff --git a/hathor/event/websocket/factory.py b/hathor/event/websocket/factory.py index 075ef49ed..2bc2724e7 100644 --- a/hathor/event/websocket/factory.py +++ b/hathor/event/websocket/factory.py @@ -21,7 +21,8 @@ from hathor.event.storage import EventStorage from hathor.event.websocket.protocol import EventWebsocketProtocol from hathor.event.websocket.response import EventResponse, InvalidRequestType -from hathor.util import Reactor, not_none +from hathor.reactor import ReactorProtocol as Reactor +from hathor.util import not_none logger = get_logger() diff --git a/hathor/manager.py b/hathor/manager.py index 7f43c97ee..43963478a 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -50,6 +50,7 @@ from hathor.p2p.protocol import HathorProtocol from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.stratum import StratumFactory from hathor.transaction import BaseTransaction, Block, MergeMinedBlock, Transaction, TxVersion, sum_weights from hathor.transaction.exceptions import TxValidationError @@ -57,7 +58,7 @@ from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.transaction.storage.tx_allow_scope import TxAllowScope from hathor.types import Address, VertexId -from hathor.util import EnvironmentInfo, LogDuration, Random, Reactor, calculate_min_significant_weight, not_none +from hathor.util import EnvironmentInfo, LogDuration, Random, calculate_min_significant_weight, not_none from hathor.verification.verification_service import VerificationService from hathor.wallet import BaseWallet diff --git a/hathor/metrics.py b/hathor/metrics.py index 2c27955be..64ee2bd08 100644 --- a/hathor/metrics.py +++ b/hathor/metrics.py @@ -22,11 +22,11 @@ from hathor.conf import HathorSettings from hathor.p2p.manager import ConnectionsManager, PeerConnectionsMetrics from hathor.pubsub import EventArguments, HathorEvents, PubSubManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction.base_transaction import sum_weights from hathor.transaction.block import Block from hathor.transaction.storage import TransactionRocksDBStorage, TransactionStorage from hathor.transaction.storage.cache_storage import TransactionCacheStorage -from hathor.util import Reactor if TYPE_CHECKING: from hathor.stratum import StratumFactory # noqa: F401 @@ -63,7 +63,7 @@ class Metrics: connections: ConnectionsManager tx_storage: TransactionStorage # Twisted reactor that handles the time and callLater - reactor: Optional[Reactor] = None + reactor: Reactor # Transactions count in the network transactions: int = 0 @@ -127,10 +127,6 @@ def __post_init__(self) -> None: # Stores caculated block weights saved in tx storage self.weight_block_deque: deque[WeightValue] = deque(maxlen=self.weight_block_deque_len) - if self.reactor is None: - from hathor.util import reactor as twisted_reactor - self.reactor = twisted_reactor - # A timer to periodically collect data self._lc_collect_data = LoopingCall(self._collect_data) self._lc_collect_data.clock = self.reactor diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index ad5df083e..f7c60b1bb 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -34,8 +34,9 @@ from hathor.p2p.sync_version import SyncVersion from hathor.p2p.utils import description_to_connection_string, parse_whitelist from hathor.pubsub import HathorEvents, PubSubManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction -from hathor.util import Random, Reactor +from hathor.util import Random if TYPE_CHECKING: from twisted.internet.interfaces import IDelayedCall diff --git a/hathor/p2p/netfilter/matches_remote.py b/hathor/p2p/netfilter/matches_remote.py index 79b011e20..734c4ea13 100644 --- a/hathor/p2p/netfilter/matches_remote.py +++ b/hathor/p2p/netfilter/matches_remote.py @@ -20,7 +20,7 @@ from twisted.internet.task import LoopingCall from hathor.p2p.netfilter.matches import NetfilterMatch, NetfilterMatchIPAddress -from hathor.util import Reactor +from hathor.reactor import ReactorProtocol as Reactor if TYPE_CHECKING: from hathor.p2p.netfilter.context import NetfilterContext diff --git a/hathor/p2p/rate_limiter.py b/hathor/p2p/rate_limiter.py index f341ffb91..defbd9342 100644 --- a/hathor/p2p/rate_limiter.py +++ b/hathor/p2p/rate_limiter.py @@ -14,7 +14,7 @@ from typing import NamedTuple, Optional -from hathor.util import Reactor +from hathor.reactor import ReactorProtocol as Reactor class RateLimiterLimit(NamedTuple): @@ -32,12 +32,9 @@ class RateLimiter: # Stores the last hit for each key hits: dict[str, RateLimiterLimit] - def __init__(self, reactor: Optional[Reactor] = None): + def __init__(self, reactor: Reactor): self.keys = {} self.hits = {} - if reactor is None: - from hathor.util import reactor as twisted_reactor - reactor = twisted_reactor self.reactor = reactor def set_limit(self, key: str, max_hits: int, window_seconds: float) -> None: diff --git a/hathor/p2p/sync_factory.py b/hathor/p2p/sync_factory.py index 4f04a734b..f4883f21a 100644 --- a/hathor/p2p/sync_factory.py +++ b/hathor/p2p/sync_factory.py @@ -13,10 +13,10 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from hathor.p2p.sync_agent import SyncAgent -from hathor.util import Reactor +from hathor.reactor import ReactorProtocol as Reactor if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol @@ -24,5 +24,5 @@ class SyncAgentFactory(ABC): @abstractmethod - def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent: + def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: pass diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index f9757f7e7..cb300907c 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -26,10 +26,11 @@ from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_v1.downloader import Downloader +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction from hathor.transaction.base_transaction import tx_or_block_from_bytes from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.util import Reactor, json_dumps, json_loads +from hathor.util import json_dumps, json_loads logger = get_logger() @@ -59,7 +60,7 @@ class NodeSyncTimestamp(SyncAgent): MAX_HASHES: int = 40 - def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor: Optional[Reactor] = None) -> None: + def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor: Reactor) -> None: """ :param protocol: Protocol of the connection. :type protocol: HathorProtocol @@ -72,9 +73,6 @@ def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor: self.manager = protocol.node self.downloader = downloader - if reactor is None: - from hathor.util import reactor as twisted_reactor - reactor = twisted_reactor self.reactor: Reactor = reactor # Rate limit for this connection. diff --git a/hathor/p2p/sync_v1/factory.py b/hathor/p2p/sync_v1/factory.py index 57d8819ae..d6fa55deb 100644 --- a/hathor/p2p/sync_v1/factory.py +++ b/hathor/p2p/sync_v1/factory.py @@ -19,7 +19,7 @@ from hathor.p2p.sync_factory import SyncAgentFactory from hathor.p2p.sync_v1.agent import NodeSyncTimestamp from hathor.p2p.sync_v1.downloader import Downloader -from hathor.util import Reactor +from hathor.reactor import ReactorProtocol as Reactor if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol @@ -36,5 +36,5 @@ def get_downloader(self) -> Downloader: self._downloader = Downloader(self.connections.manager) return self._downloader - def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent: + def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: return NodeSyncTimestamp(protocol, downloader=self.get_downloader(), reactor=reactor) diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 3f14c8062..c93ee57eb 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -37,11 +37,12 @@ TransactionsStreamingServer, ) from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.base_transaction import tx_or_block_from_bytes from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.types import VertexId -from hathor.util import Reactor, not_none +from hathor.util import not_none if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol @@ -82,7 +83,7 @@ class NodeBlockSync(SyncAgent): """ name: str = 'node-block-sync' - def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> None: + def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None: """ :param protocol: Protocol of the connection. :type protocol: HathorProtocol @@ -98,10 +99,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None self.DEFAULT_STREAMING_LIMIT = DEFAULT_STREAMING_LIMIT - if reactor is None: - from hathor.util import reactor as twisted_reactor - reactor = twisted_reactor - assert reactor is not None self.reactor: Reactor = reactor self._is_streaming: bool = False diff --git a/hathor/p2p/sync_v2/factory.py b/hathor/p2p/sync_v2/factory.py index defb37283..71f17dd87 100644 --- a/hathor/p2p/sync_v2/factory.py +++ b/hathor/p2p/sync_v2/factory.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from hathor.p2p.manager import ConnectionsManager from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_factory import SyncAgentFactory from hathor.p2p.sync_v2.agent import NodeBlockSync -from hathor.util import Reactor +from hathor.reactor import ReactorProtocol as Reactor if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol @@ -28,5 +28,5 @@ class SyncV2Factory(SyncAgentFactory): def __init__(self, connections: ConnectionsManager): self.connections = connections - def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncAgent: + def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: return NodeBlockSync(protocol, reactor=reactor) diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index f83ad68c0..b46ea546b 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -167,18 +167,18 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] if tx.hash in self._db: # This case might happen during a resume, so we just log and keep syncing. self.log.debug('duplicated vertex received', tx_id=tx.hash.hex()) - self.update_dependencies(tx) + self._update_dependencies(tx) elif tx.hash in self._existing_deps: # This case might happen if we already have the transaction from another sync. self.log.debug('existing vertex received', tx_id=tx.hash.hex()) - self.update_dependencies(tx) + self._update_dependencies(tx) else: self.log.info('unexpected vertex received', tx_id=tx.hash.hex()) self.fails(UnexpectedVertex(tx.hash.hex())) return self._waiting_for.remove(tx.hash) - self.update_dependencies(tx) + self._update_dependencies(tx) self._db[tx.hash] = tx @@ -194,7 +194,7 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] if self._tx_received % 100 == 0: self.log.debug('tx streaming in progress', txs_received=self._tx_received) - def update_dependencies(self, tx: BaseTransaction) -> None: + def _update_dependencies(self, tx: BaseTransaction) -> None: """Update _existing_deps and _waiting_for with the dependencies.""" for dep in tx.get_all_dependencies(): if self.tx_storage.transaction_exists(dep) or dep in self._db: @@ -249,7 +249,4 @@ def _prepare_block(self, blk: 'Block') -> None: self._db.clear() self._existing_deps.clear() - # Add pending dependencies from block. - for dep in blk.get_all_dependencies(): - if not self.tx_storage.transaction_exists(dep): - self._waiting_for.add(dep) + self._update_dependencies(blk) diff --git a/hathor/prometheus.py b/hathor/prometheus.py index 6bd9637d1..63c1a7727 100644 --- a/hathor/prometheus.py +++ b/hathor/prometheus.py @@ -19,7 +19,7 @@ from twisted.internet.task import LoopingCall from hathor.conf.get_settings import get_settings -from hathor.util import reactor +from hathor.reactor import get_global_reactor if TYPE_CHECKING: from hathor.metrics import Metrics @@ -102,7 +102,7 @@ def __init__(self, metrics: 'Metrics', path: str, filename: str = 'hathor.prom', # A timer to periodically write data to prometheus self._lc_write_data = LoopingCall(self._write_data) - self._lc_write_data.clock = reactor + self._lc_write_data.clock = get_global_reactor() def _initial_setup(self) -> None: """ Start a collector registry to send data to node exporter diff --git a/hathor/pubsub.py b/hathor/pubsub.py index f598f4998..0a3168aa7 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -20,7 +20,7 @@ from twisted.internet.interfaces import IDelayedCall, IReactorFromThreads from twisted.python.threadable import isInIOThread -from hathor.util import Reactor +from hathor.reactor import ReactorProtocol as Reactor from hathor.utils.zope import verified_cast if TYPE_CHECKING: diff --git a/hathor/reactor/__init__.py b/hathor/reactor/__init__.py index e69de29bb..d87649a98 100644 --- a/hathor/reactor/__init__.py +++ b/hathor/reactor/__init__.py @@ -0,0 +1,22 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.reactor.reactor import get_global_reactor, initialize_global_reactor +from hathor.reactor.reactor_protocol import ReactorProtocol + +__all__ = [ + 'initialize_global_reactor', + 'get_global_reactor', + 'ReactorProtocol', +] diff --git a/hathor/reactor/reactor.py b/hathor/reactor/reactor.py index 10bda8e98..e94dc3a97 100644 --- a/hathor/reactor/reactor.py +++ b/hathor/reactor/reactor.py @@ -14,18 +14,72 @@ from typing import cast -from twisted.internet import reactor as twisted_reactor +from structlog import get_logger from twisted.internet.interfaces import IReactorCore, IReactorTCP, IReactorTime from zope.interface.verify import verifyObject from hathor.reactor.reactor_protocol import ReactorProtocol -assert verifyObject(IReactorTime, twisted_reactor) is True -assert verifyObject(IReactorCore, twisted_reactor) is True -assert verifyObject(IReactorTCP, twisted_reactor) is True +logger = get_logger() -""" -This variable is the global reactor that should be imported to use the Twisted reactor. -It's cast to ReactorProtocol, our own type that stubs the necessary Twisted zope interfaces, to aid typing. -""" -reactor = cast(ReactorProtocol, twisted_reactor) +# Internal variable that should NOT be accessed directly. +_reactor: ReactorProtocol | None = None + + +def get_global_reactor() -> ReactorProtocol: + """ + Get the global Twisted reactor. It should be the only way to get a reactor, other than using the instance that + is passed around (which should be the same instance as the one returned by this function). + + This function must NOT be called in the module-level, only inside other functions. + """ + global _reactor + + if _reactor is None: + raise Exception('The reactor is not initialized. Use `initialize_global_reactor()`.') + + return _reactor + + +def initialize_global_reactor(*, use_asyncio_reactor: bool = False) -> ReactorProtocol: + """ + Initialize the global Twisted reactor. Must ony be called once. + This function must NOT be called in the module-level, only inside other functions. + """ + global _reactor + + if _reactor is not None: + log = logger.new() + log.warn('The reactor has already been initialized. Use `get_global_reactor()`.') + return _reactor + + if use_asyncio_reactor: + import asyncio + import sys + + from twisted.internet import asyncioreactor + from twisted.internet.error import ReactorAlreadyInstalledError + + if sys.platform == 'win32': + # See: https://docs.twistedmatrix.com/en/twisted-22.10.0/api/twisted.internet.asyncioreactor.AsyncioSelectorReactor.html # noqa: E501 + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + try: + asyncioreactor.install(asyncio.get_event_loop()) + except ReactorAlreadyInstalledError as e: + msg = ( + "There's a Twisted reactor installed already. It's probably the default one, installed indirectly by " + "one of our imports. This can happen, for example, if we import from the hathor module in " + "entrypoint-level, like in CLI tools other than `RunNode`." + ) + raise Exception(msg) from e + + from twisted.internet import reactor as twisted_reactor + + assert verifyObject(IReactorTime, twisted_reactor) is True + assert verifyObject(IReactorCore, twisted_reactor) is True + assert verifyObject(IReactorTCP, twisted_reactor) is True + + # We cast to ReactorProtocol, our own type that stubs the necessary Twisted zope interfaces, to aid typing. + _reactor = cast(ReactorProtocol, twisted_reactor) + return _reactor diff --git a/hathor/stratum/stratum.py b/hathor/stratum/stratum.py index a044de4a0..5d4085569 100644 --- a/hathor/stratum/stratum.py +++ b/hathor/stratum/stratum.py @@ -38,9 +38,10 @@ from hathor.exception import InvalidNewTransaction from hathor.p2p.utils import format_address from hathor.pubsub import EventArguments, HathorEvents +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction, BitcoinAuxPow, Block, MergeMinedBlock, Transaction, sum_weights from hathor.transaction.exceptions import PowError, ScriptError, TxValidationError -from hathor.util import Reactor, json_dumpb, json_loadb, reactor +from hathor.util import json_dumpb, json_loadb from hathor.verification.vertex_verifier import VertexVerifier from hathor.wallet.exceptions import InvalidAddress @@ -735,7 +736,7 @@ class StratumFactory(ServerFactory): mined_txs: dict[bytes, Transaction] deferreds_tx: dict[bytes, Deferred] - def __init__(self, manager: 'HathorManager', reactor: Reactor = reactor): + def __init__(self, manager: 'HathorManager', reactor: Reactor): self.log = logger.new() self.manager = manager self.reactor = reactor @@ -824,7 +825,7 @@ class StratumClient(JSONRPC): address: Optional[bytes] - def __init__(self, proc_count: Optional[int] = None, address: Optional[bytes] = None, + def __init__(self, reactor: Reactor, proc_count: Optional[int] = None, address: Optional[bytes] = None, id_generator: Optional[Callable[[], Iterator[Union[str, int]]]] = lambda: count()): self.log = logger.new() self.job_data = MinerJob() @@ -836,24 +837,23 @@ def __init__(self, proc_count: Optional[int] = None, address: Optional[bytes] = self.loop = None self.address = address self._iter_id = id_generator and id_generator() or None + self.reactor = reactor def _next_id(self): if self._iter_id: return str(next(self._iter_id)) - def start(self, clock: Optional[Reactor] = None) -> None: + def start(self) -> None: """ Starts the client, instantiating mining processes and scheduling miner supervisor calls. """ - if clock is None: - clock = reactor args = (self.job_data, self.signal, self.queue) proc_count = self.proc_count or cast(int, cpu_count()) self.signal.value = self.SLEEP self.miners = [Process(target=miner_job, args=(i, proc_count, *args)) for i in range(proc_count)] self.loop = task.LoopingCall(supervisor_job, self) - self.loop.clock = clock + self.loop.clock = self.reactor self.loop.start(self.SUPERVISOR_LOOP_INTERVAL) for miner in self.miners: diff --git a/hathor/transaction/storage/cache_storage.py b/hathor/transaction/storage/cache_storage.py index 0cc41d359..8a1937a03 100644 --- a/hathor/transaction/storage/cache_storage.py +++ b/hathor/transaction/storage/cache_storage.py @@ -18,11 +18,11 @@ from twisted.internet import threads from hathor.indexes import IndexesManager +from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction from hathor.transaction.storage.migrations import MigrationState from hathor.transaction.storage.transaction_storage import BaseTransactionStorage from hathor.transaction.storage.tx_allow_scope import TxAllowScope -from hathor.util import Reactor class TransactionCacheStorage(BaseTransactionStorage): diff --git a/hathor/util.py b/hathor/util.py index 3e1b910db..20fd9e995 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -30,8 +30,6 @@ import hathor from hathor.conf.get_settings import get_settings -from hathor.reactor.reactor import reactor as hathor_reactor -from hathor.reactor.reactor_protocol import ReactorProtocol from hathor.types import TokenUid if TYPE_CHECKING: @@ -39,11 +37,6 @@ from hathor.transaction.base_transaction import BaseTransaction -# TODO: Those reexports are kept for retro-compatibility, but users could import them directly and then we can remove -# them from this file. -Reactor = ReactorProtocol -reactor = hathor_reactor - logger = get_logger() T = TypeVar('T') diff --git a/hathor/wallet/base_wallet.py b/hathor/wallet/base_wallet.py index 913c13bc7..ec5e2dc5e 100644 --- a/hathor/wallet/base_wallet.py +++ b/hathor/wallet/base_wallet.py @@ -27,13 +27,13 @@ from hathor.conf import HathorSettings from hathor.crypto.util import decode_address from hathor.pubsub import EventArguments, HathorEvents, PubSubManager +from hathor.reactor import ReactorProtocol as Reactor, get_global_reactor from hathor.transaction import BaseTransaction, Block, TxInput, TxOutput from hathor.transaction.base_transaction import int_to_bytes from hathor.transaction.scripts import P2PKH, create_output_script, parse_address_script from hathor.transaction.storage import TransactionStorage from hathor.transaction.transaction import Transaction from hathor.types import AddressB58, Amount, TokenUid -from hathor.util import Reactor from hathor.wallet.exceptions import InputDuplicated, InsufficientFunds, PrivateKeyNotFound settings = HathorSettings() @@ -129,8 +129,7 @@ def __init__(self, directory: str = './', pubsub: Optional[PubSubManager] = None ] if reactor is None: - from hathor.util import reactor as twisted_reactor - reactor = twisted_reactor + reactor = get_global_reactor() self.reactor = reactor def _manually_initialize(self) -> None: diff --git a/hathor/wallet/resources/thin_wallet/send_tokens.py b/hathor/wallet/resources/thin_wallet/send_tokens.py index 65136c1b6..c9bbf10ce 100644 --- a/hathor/wallet/resources/thin_wallet/send_tokens.py +++ b/hathor/wallet/resources/thin_wallet/send_tokens.py @@ -27,10 +27,11 @@ from hathor.cli.openapi_files.register import register_resource from hathor.conf.get_settings import get_settings from hathor.exception import InvalidNewTransaction +from hathor.reactor import get_global_reactor from hathor.transaction import Transaction from hathor.transaction.base_transaction import tx_or_block_from_bytes from hathor.transaction.exceptions import TxValidationError -from hathor.util import json_dumpb, json_loadb, reactor +from hathor.util import json_dumpb, json_loadb logger = get_logger() @@ -59,6 +60,7 @@ def __init__(self, manager): self.manager = manager self.sleep_seconds = 0 self.log = logger.new() + self.reactor = get_global_reactor() def render_POST(self, request: Request) -> Any: """ POST request for /thin_wallet/send_tokens/ @@ -177,7 +179,7 @@ def _render_POST(self, context: _Context) -> None: # Set parents tx.parents = self.manager.get_new_tx_parents(tx.timestamp) - deferred = threads.deferToThreadPool(reactor, self.manager.pow_thread_pool, + deferred = threads.deferToThreadPool(self.reactor, self.manager.pow_thread_pool, self._render_POST_thread, context) deferred.addCallback(self._cb_tx_resolve) deferred.addErrback(self._err_tx_resolve, context, 'python_resolve') @@ -204,7 +206,7 @@ def _stratum_deferred_resolve(self, context: _Context) -> None: # Delete it to avoid memory leak del self.manager.stratum_factory.mined_txs[funds_hash] - deferred = threads.deferToThreadPool(reactor, self.manager.pow_thread_pool, + deferred = threads.deferToThreadPool(self.reactor, self.manager.pow_thread_pool, self._stratum_thread_verify, context) deferred.addCallback(self._cb_tx_resolve) deferred.addErrback(self._err_tx_resolve, context, 'stratum_resolve') diff --git a/hathor/websocket/factory.py b/hathor/websocket/factory.py index 1a797189d..2c7aa2d16 100644 --- a/hathor/websocket/factory.py +++ b/hathor/websocket/factory.py @@ -25,7 +25,8 @@ from hathor.metrics import Metrics from hathor.p2p.rate_limiter import RateLimiter from hathor.pubsub import EventArguments, HathorEvents -from hathor.util import json_dumpb, json_loadb, json_loads, reactor +from hathor.reactor import get_global_reactor +from hathor.util import json_dumpb, json_loadb, json_loads from hathor.websocket.protocol import HathorAdminWebsocketProtocol settings = HathorSettings() @@ -89,6 +90,7 @@ def __init__(self, metrics: Optional[Metrics] = None, address_index: Optional[Ad :param metrics: If not given, a new one is created. :type metrics: :py:class:`hathor.metrics.Metrics` """ + self.reactor = get_global_reactor() # Opened websocket connections so I can broadcast messages later # It contains only connections that have finished handshaking. self.connections: set[HathorAdminWebsocketProtocol] = set() @@ -98,7 +100,7 @@ def __init__(self, metrics: Optional[Metrics] = None, address_index: Optional[Ad super().__init__() # Limit the send message rate for specific type of data - self.rate_limiter = RateLimiter(reactor=reactor) + self.rate_limiter = RateLimiter(reactor=self.reactor) # Stores the buffer of messages that exceeded the rate limit and will be sent self.buffer_deques: dict[str, deque[dict[str, Any]]] = {} @@ -111,7 +113,7 @@ def __init__(self, metrics: Optional[Metrics] = None, address_index: Optional[Ad # A timer to periodically broadcast dashboard metrics self._lc_send_metrics = LoopingCall(self._send_metrics) - self._lc_send_metrics.clock = reactor + self._lc_send_metrics.clock = self.reactor def start(self): self.is_running = True @@ -144,7 +146,7 @@ def _send_metrics(self): 'hash_rate': self.metrics.hash_rate, 'peers': self.metrics.connected_peers, 'type': 'dashboard:metrics', - 'time': reactor.seconds(), + 'time': self.reactor.seconds(), }) def subscribe(self, pubsub): @@ -277,8 +279,8 @@ def enqueue_for_later(self, data): self.buffer_deques[data['type']].append(data) if len(self.buffer_deques[data['type']]) == 1: # If it's the first time we hit the limit (only one message in deque), we schedule process_deque - reactor.callLater(CONTROLLED_TYPES[data['type']]['time_buffering'], self.process_deque, - data_type=data['type']) + self.reactor.callLater(CONTROLLED_TYPES[data['type']]['time_buffering'], self.process_deque, + data_type=data['type']) def process_deque(self, data_type): """ Process the deque and check if I have limit to send the messages now @@ -294,8 +296,8 @@ def process_deque(self, data_type): data['throttled'] = False self.send_message(data) else: - reactor.callLater(CONTROLLED_TYPES[data_type]['time_buffering'], self.process_deque, - data_type=data_type) + self.reactor.callLater(CONTROLLED_TYPES[data_type]['time_buffering'], self.process_deque, + data_type=data_type) break def handle_message(self, connection: HathorAdminWebsocketProtocol, data: Union[bytes, str]) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 475c5b59f..33fb90950 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,18 +1,14 @@ -import asyncio import os import sys -from twisted.internet import asyncioreactor - from hathor.conf import UNITTESTS_SETTINGS_FILEPATH +from hathor.reactor import initialize_global_reactor os.environ['HATHOR_CONFIG_YAML'] = os.environ.get('HATHOR_TEST_CONFIG_YAML', UNITTESTS_SETTINGS_FILEPATH) if sys.platform == 'win32': - # See: https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.AsyncioSelectorReactor.html - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - # XXX: because rocksdb isn't available on Windows, we force using memory-storage for tests so most of them can run os.environ['HATHOR_TEST_MEMORY_STORAGE'] = 'true' -asyncioreactor.install(asyncio.get_event_loop()) +# TODO: We should remove this call from the module level. +initialize_global_reactor(use_asyncio_reactor=True) diff --git a/tests/tx/test_stratum.py b/tests/tx/test_stratum.py index f059aacb8..47331a341 100644 --- a/tests/tx/test_stratum.py +++ b/tests/tx/test_stratum.py @@ -256,7 +256,7 @@ def setUp(self): storage = TransactionMemoryStorage() self.block = storage.get_transaction(self._settings.GENESIS_BLOCK_HASH) self.transport = StringTransportWithDisconnection() - self.protocol = StratumClient() + self.protocol = StratumClient(reactor=self.clock) self.protocol.makeConnection(self.transport) self.job_request_params = { 'data': self.block.get_header_without_nonce().hex(), diff --git a/tests/unittest.py b/tests/unittest.py index 68ae47ed3..852f27bd8 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -15,9 +15,10 @@ from hathor.daa import DifficultyAdjustmentAlgorithm, TestMode from hathor.p2p.peer_id import PeerId from hathor.p2p.sync_version import SyncVersion +from hathor.reactor import ReactorProtocol as Reactor, get_global_reactor from hathor.simulator.clock import MemoryReactorHeapClock from hathor.transaction import BaseTransaction -from hathor.util import Random, Reactor, reactor +from hathor.util import Random from hathor.wallet import HDWallet, Wallet from tests.test_memory_reactor_clock import TestMemoryReactorClock @@ -499,6 +500,7 @@ def clean_pending(self, required_to_quiesce=True): Copy from: https://github.com/zooko/pyutil/blob/master/pyutil/testutil.py#L68 """ + reactor = get_global_reactor() pending = reactor.getDelayedCalls() active = bool(pending) for p in pending: