Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
python=python,
platform=platform.platform(),
settings=settings_source,
reactor_type=type(reactor).__name__,
)

tx_storage: TransactionStorage
Expand Down
9 changes: 6 additions & 3 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class RunNode:
('--enable-crash-api', lambda args: bool(args.enable_crash_api)),
('--x-sync-bridge', lambda args: bool(args.x_sync_bridge)),
('--x-sync-v2-only', lambda args: bool(args.x_sync_v2_only)),
('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue))
('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)),
('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor))
]

@classmethod
Expand Down Expand Up @@ -117,6 +118,8 @@ def create_parser(cls) -> ArgumentParser:
help=f'Signal support for a feature. One of {possible_features}')
parser.add_argument('--signal-not-support', default=[], action='append', choices=possible_features,
help=f'Signal not support for a feature. One of {possible_features}')
parser.add_argument('--x-asyncio-reactor', action='store_true',
help='Use asyncio reactor instead of Twisted\'s default.')
return parser

def prepare(self, *, register_resources: bool = True) -> None:
Expand All @@ -138,8 +141,8 @@ def prepare(self, *, register_resources: bool = True) -> None:
self.check_unsafe_arguments()
self.check_python_version()

from hathor.reactor import get_global_reactor
reactor = get_global_reactor()
from hathor.reactor import initialize_global_reactor
reactor = initialize_global_reactor(use_asyncio_reactor=self._args.x_asyncio_reactor)
self.reactor = reactor

from hathor.builder import CliBuilder, ResourcesBuilder
Expand Down
1 change: 1 addition & 0 deletions hathor/cli/run_node_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow):
config_yaml: Optional[str]
signal_support: set[Feature]
signal_not_support: set[Feature]
x_asyncio_reactor: bool
3 changes: 2 additions & 1 deletion hathor/reactor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from hathor.reactor.reactor import get_global_reactor
from hathor.reactor.reactor import get_global_reactor, initialize_global_reactor
from hathor.reactor.reactor_protocol import ReactorProtocol

__all__ = [
'initialize_global_reactor',
'get_global_reactor',
'ReactorProtocol',
]
41 changes: 41 additions & 0 deletions hathor/reactor/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

from typing import cast

from structlog import get_logger
from twisted.internet.interfaces import IReactorCore, IReactorTCP, IReactorTime
from zope.interface.verify import verifyObject

from hathor.reactor.reactor_protocol import ReactorProtocol

logger = get_logger()

# Internal variable that should NOT be accessed directly.
_reactor: ReactorProtocol | None = None

Expand All @@ -27,12 +30,50 @@ def get_global_reactor() -> ReactorProtocol:
"""
Get the global Twisted reactor. It should be the only way to get a reactor, other than using the instance that
is passed around (which should be the same instance as the one returned by this function).

This function must NOT be called in the module-level, only inside other functions.
"""
global _reactor

if _reactor is None:
raise Exception('The reactor is not initialized. Use `initialize_global_reactor()`.')

return _reactor


def initialize_global_reactor(*, use_asyncio_reactor: bool = False) -> ReactorProtocol:
"""
Initialize the global Twisted reactor. Must ony be called once.
This function must NOT be called in the module-level, only inside other functions.
"""
global _reactor

if _reactor is not None:
log = logger.new()
log.warn('The reactor has already been initialized. Use `get_global_reactor()`.')
return _reactor

if use_asyncio_reactor:
import asyncio
import sys

from twisted.internet import asyncioreactor
from twisted.internet.error import ReactorAlreadyInstalledError

if sys.platform == 'win32':
# See: https://docs.twistedmatrix.com/en/twisted-22.10.0/api/twisted.internet.asyncioreactor.AsyncioSelectorReactor.html # noqa: E501
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

try:
asyncioreactor.install(asyncio.get_event_loop())
except ReactorAlreadyInstalledError as e:
msg = (
"There's a Twisted reactor installed already. It's probably the default one, installed indirectly by "
"one of our imports. This can happen, for example, if we import from the hathor module in "
"entrypoint-level, like in CLI tools other than `RunNode`."
)
raise Exception(msg) from e

from twisted.internet import reactor as twisted_reactor

assert verifyObject(IReactorTime, twisted_reactor) is True
Expand Down
10 changes: 3 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import asyncio
import os
import sys

from twisted.internet import asyncioreactor

from hathor.conf import UNITTESTS_SETTINGS_FILEPATH
from hathor.reactor import initialize_global_reactor

os.environ['HATHOR_CONFIG_YAML'] = os.environ.get('HATHOR_TEST_CONFIG_YAML', UNITTESTS_SETTINGS_FILEPATH)

if sys.platform == 'win32':
# See: https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.AsyncioSelectorReactor.html
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

# XXX: because rocksdb isn't available on Windows, we force using memory-storage for tests so most of them can run
os.environ['HATHOR_TEST_MEMORY_STORAGE'] = 'true'

asyncioreactor.install(asyncio.get_event_loop())
# TODO: We should remove this call from the module level.
initialize_global_reactor(use_asyncio_reactor=True)