From e84f95da70093687798544fb9ec1e379208d1cbf Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 18 Dec 2023 19:49:54 -0300 Subject: [PATCH] refactor(reactor): implement option to use asyncio reactor --- hathor/builder/cli_builder.py | 1 + hathor/cli/run_node.py | 9 +++++--- hathor/cli/run_node_args.py | 1 + hathor/reactor/__init__.py | 3 ++- hathor/reactor/reactor.py | 41 +++++++++++++++++++++++++++++++++++ tests/conftest.py | 10 +++------ 6 files changed, 54 insertions(+), 11 deletions(-) diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 3e6917719..8d3463176 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -100,6 +100,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: python=python, platform=platform.platform(), settings=settings_source, + reactor_type=type(reactor).__name__, ) tx_storage: TransactionStorage diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index eb2ba7b88..55b9c1730 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -33,7 +33,8 @@ class RunNode: ('--enable-crash-api', lambda args: bool(args.enable_crash_api)), ('--x-sync-bridge', lambda args: bool(args.x_sync_bridge)), ('--x-sync-v2-only', lambda args: bool(args.x_sync_v2_only)), - ('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)) + ('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)), + ('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor)) ] @classmethod @@ -117,6 +118,8 @@ def create_parser(cls) -> ArgumentParser: help=f'Signal support for a feature. One of {possible_features}') parser.add_argument('--signal-not-support', default=[], action='append', choices=possible_features, help=f'Signal not support for a feature. One of {possible_features}') + parser.add_argument('--x-asyncio-reactor', action='store_true', + help='Use asyncio reactor instead of Twisted\'s default.') return parser def prepare(self, *, register_resources: bool = True) -> None: @@ -138,8 +141,8 @@ def prepare(self, *, register_resources: bool = True) -> None: self.check_unsafe_arguments() self.check_python_version() - from hathor.reactor import get_global_reactor - reactor = get_global_reactor() + from hathor.reactor import initialize_global_reactor + reactor = initialize_global_reactor(use_asyncio_reactor=self._args.x_asyncio_reactor) self.reactor = reactor from hathor.builder import CliBuilder, ResourcesBuilder diff --git a/hathor/cli/run_node_args.py b/hathor/cli/run_node_args.py index d6d993b0f..897555fbb 100644 --- a/hathor/cli/run_node_args.py +++ b/hathor/cli/run_node_args.py @@ -72,3 +72,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow): config_yaml: Optional[str] signal_support: set[Feature] signal_not_support: set[Feature] + x_asyncio_reactor: bool diff --git a/hathor/reactor/__init__.py b/hathor/reactor/__init__.py index 932f52ca3..d87649a98 100644 --- a/hathor/reactor/__init__.py +++ b/hathor/reactor/__init__.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from hathor.reactor.reactor import get_global_reactor +from hathor.reactor.reactor import get_global_reactor, initialize_global_reactor from hathor.reactor.reactor_protocol import ReactorProtocol __all__ = [ + 'initialize_global_reactor', 'get_global_reactor', 'ReactorProtocol', ] diff --git a/hathor/reactor/reactor.py b/hathor/reactor/reactor.py index efdbaebbd..e94dc3a97 100644 --- a/hathor/reactor/reactor.py +++ b/hathor/reactor/reactor.py @@ -14,11 +14,14 @@ from typing import cast +from structlog import get_logger from twisted.internet.interfaces import IReactorCore, IReactorTCP, IReactorTime from zope.interface.verify import verifyObject from hathor.reactor.reactor_protocol import ReactorProtocol +logger = get_logger() + # Internal variable that should NOT be accessed directly. _reactor: ReactorProtocol | None = None @@ -27,12 +30,50 @@ def get_global_reactor() -> ReactorProtocol: """ Get the global Twisted reactor. It should be the only way to get a reactor, other than using the instance that is passed around (which should be the same instance as the one returned by this function). + + This function must NOT be called in the module-level, only inside other functions. + """ + global _reactor + + if _reactor is None: + raise Exception('The reactor is not initialized. Use `initialize_global_reactor()`.') + + return _reactor + + +def initialize_global_reactor(*, use_asyncio_reactor: bool = False) -> ReactorProtocol: + """ + Initialize the global Twisted reactor. Must ony be called once. + This function must NOT be called in the module-level, only inside other functions. """ global _reactor if _reactor is not None: + log = logger.new() + log.warn('The reactor has already been initialized. Use `get_global_reactor()`.') return _reactor + if use_asyncio_reactor: + import asyncio + import sys + + from twisted.internet import asyncioreactor + from twisted.internet.error import ReactorAlreadyInstalledError + + if sys.platform == 'win32': + # See: https://docs.twistedmatrix.com/en/twisted-22.10.0/api/twisted.internet.asyncioreactor.AsyncioSelectorReactor.html # noqa: E501 + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + try: + asyncioreactor.install(asyncio.get_event_loop()) + except ReactorAlreadyInstalledError as e: + msg = ( + "There's a Twisted reactor installed already. It's probably the default one, installed indirectly by " + "one of our imports. This can happen, for example, if we import from the hathor module in " + "entrypoint-level, like in CLI tools other than `RunNode`." + ) + raise Exception(msg) from e + from twisted.internet import reactor as twisted_reactor assert verifyObject(IReactorTime, twisted_reactor) is True diff --git a/tests/conftest.py b/tests/conftest.py index 475c5b59f..33fb90950 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,18 +1,14 @@ -import asyncio import os import sys -from twisted.internet import asyncioreactor - from hathor.conf import UNITTESTS_SETTINGS_FILEPATH +from hathor.reactor import initialize_global_reactor os.environ['HATHOR_CONFIG_YAML'] = os.environ.get('HATHOR_TEST_CONFIG_YAML', UNITTESTS_SETTINGS_FILEPATH) if sys.platform == 'win32': - # See: https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.AsyncioSelectorReactor.html - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - # XXX: because rocksdb isn't available on Windows, we force using memory-storage for tests so most of them can run os.environ['HATHOR_TEST_MEMORY_STORAGE'] = 'true' -asyncioreactor.install(asyncio.get_event_loop()) +# TODO: We should remove this call from the module level. +initialize_global_reactor(use_asyncio_reactor=True)