Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import Any, Optional

from structlog import get_logger
from twisted.internet.posixbase import PosixReactorBase

from hathor.cli.run_node import RunNodeArgs
from hathor.consensus import ConsensusAlgorithm
Expand All @@ -35,7 +34,7 @@
from hathor.p2p.utils import discover_hostname
from hathor.pubsub import PubSubManager
from hathor.stratum import StratumFactory
from hathor.util import Random
from hathor.util import Random, Reactor
from hathor.wallet import BaseWallet, HDWallet, Wallet

logger = get_logger()
Expand All @@ -55,7 +54,7 @@ def check_or_raise(self, condition: bool, message: str) -> None:
if not condition:
raise BuilderError(message)

def create_manager(self, reactor: PosixReactorBase) -> HathorManager:
def create_manager(self, reactor: Reactor) -> HathorManager:
import hathor
from hathor.conf import HathorSettings
from hathor.conf.get_settings import get_settings_source
Expand Down
2 changes: 2 additions & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def prepare(self, *, register_resources: bool = True) -> None:
self.start_manager()

if self._args.stratum:
assert self.manager.stratum_factory is not None
self.reactor.listenTCP(self._args.stratum, self.manager.stratum_factory)

from hathor.conf import HathorSettings
Expand All @@ -170,6 +171,7 @@ def prepare(self, *, register_resources: bool = True) -> None:
)
status_server = resources_builder.build()
if self._args.status:
assert status_server is not None
self.reactor.listenTCP(self._args.status, status_server)

from hathor.builder.builder import BuildArtifacts
Expand Down
2 changes: 1 addition & 1 deletion hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _unsafe_update(self, base: BaseTransaction) -> None:
prev_block_tip=best_tip.hex(), new_block_tip=new_best_tip.hex())
# XXX: this method will mark as INVALID all transactions in the mempool that became invalid because of a
# reward lock
to_remove = storage.compute_transactions_that_became_invalid()
to_remove = storage.compute_transactions_that_became_invalid(new_best_height)
if to_remove:
self.log.warn('some transactions on the mempool became invalid and will be removed',
count=len(to_remove))
Expand Down
8 changes: 6 additions & 2 deletions hathor/debug_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

from structlog import get_logger
from twisted.internet import defer
from twisted.internet.interfaces import IReactorFromThreads
from twisted.web.http import Request

from hathor.api_util import Resource, get_arg_default, get_args
from hathor.cli.openapi_files.register import register_resource
from hathor.exception import HathorError
from hathor.manager import HathorManager
from hathor.util import reactor
from hathor.utils.zope import asserted_cast

logger = get_logger()

Expand Down Expand Up @@ -61,7 +63,8 @@ def render_GET(self, request: Request) -> bytes:
assert exc_cls_name in self.exc_class_map
exc_cls = self.exc_class_map[exc_cls_name]
msg = get_arg_default(raw_args, 'msg', self.default_msg)
reactor.callFromThread(self.run, exc_cls, msg)
threaded_reactor = asserted_cast(IReactorFromThreads, reactor)
threaded_reactor.callFromThread(self.run, exc_cls, msg)
return b'OK: no side-effects\n'


Expand Down Expand Up @@ -185,7 +188,8 @@ def render_GET(self, request: Request) -> bytes:
mess = get_arg_default(get_args(request), 'with', self.default_mess)
assert mess in self.mess_map
mess_func = self.mess_map[mess]
reactor.callFromThread(mess_func)
threaded_reactor = asserted_cast(IReactorFromThreads, reactor)
threaded_reactor.callFromThread(mess_func)
return b'OK: database yanked, full-node will break\n'


Expand Down
7 changes: 4 additions & 3 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from hathor.transaction import BaseTransaction
from hathor.transaction.base_transaction import tx_or_block_from_bytes
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.util import Reactor, json_dumps, json_loads, verified_cast
from hathor.util import Reactor, json_dumps, json_loads
from hathor.utils.zope import asserted_cast

settings = HathorSettings()
logger = get_logger()
Expand Down Expand Up @@ -60,8 +61,8 @@ def __init__(self, node_sync: 'NodeSyncTimestamp'):
self.node_sync = node_sync
self.protocol: 'HathorProtocol' = node_sync.protocol
assert self.protocol.transport is not None
self.consumer = verified_cast(IConsumer, self.protocol.transport)

consumer = asserted_cast(IConsumer, self.protocol.transport)
self.consumer = consumer
self.is_running: bool = False
self.is_producing: bool = False

Expand Down
5 changes: 3 additions & 2 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.storage.traversal import BFSOrderWalk
from hathor.util import verified_cast
from hathor.utils.zope import asserted_cast

if TYPE_CHECKING:
from hathor.p2p.protocol import HathorProtocol
Expand Down Expand Up @@ -60,7 +60,8 @@ def __init__(self, node_sync: 'NodeBlockSync', *, limit: int = DEFAULT_STREAMING
self.node_sync = node_sync
self.protocol: 'HathorProtocol' = node_sync.protocol
assert self.protocol.transport is not None
self.consumer = verified_cast(IConsumer, self.protocol.transport)
consumer = asserted_cast(IConsumer, self.protocol.transport)
self.consumer = consumer

self.counter = 0
self.limit = limit
Expand Down
30 changes: 14 additions & 16 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

from collections import defaultdict, deque
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, cast
from typing import TYPE_CHECKING, Any, Callable

from twisted.internet.interfaces import IReactorFromThreads
from twisted.python.threadable import isInIOThread

from hathor.util import Reactor, ReactorThread
from hathor.util import Reactor
from hathor.utils.zope import verified_cast

if TYPE_CHECKING:
from hathor.transaction import BaseTransaction, Block
Expand Down Expand Up @@ -187,7 +189,7 @@ def unsubscribe(self, key: HathorEvents, fn: PubSubCallable) -> None:
if fn in self._subscribers[key]:
self._subscribers[key].remove(fn)

def _call_next(self):
def _call_next(self) -> None:
"""Execute next call if it exists."""
if not self.queue:
return
Expand All @@ -196,19 +198,17 @@ def _call_next(self):
if self.queue:
self._schedule_call_next()

def _schedule_call_next(self):
def _schedule_call_next(self) -> None:
"""Schedule next call's execution."""
reactor_thread = ReactorThread.get_current_thread(self.reactor)
if reactor_thread == ReactorThread.MAIN_THREAD:
self.reactor.callLater(0, self._call_next)
elif reactor_thread == ReactorThread.NOT_MAIN_THREAD:
# XXX: does this always hold true? an assert could be tricky because it is a zope.interface
reactor = cast(IReactorFromThreads, self.reactor)
assert self.reactor.running

if not isInIOThread() and (threaded_reactor := verified_cast(IReactorFromThreads, self.reactor)):
# We're taking a conservative approach, since not all functions might need to run
# on the main thread [yan 2019-02-20]
reactor.callFromThread(self._call_next)
else:
raise NotImplementedError
threaded_reactor.callFromThread(self._call_next)
return

self.reactor.callLater(0, self._call_next)

def publish(self, key: HathorEvents, **kwargs: Any) -> None:
"""Publish a new event.
Expand All @@ -219,11 +219,9 @@ def publish(self, key: HathorEvents, **kwargs: Any) -> None:
:param **kwargs: Named arguments to be given to the functions that will be called with this event.
:type **kwargs: dict
"""
reactor_thread = ReactorThread.get_current_thread(self.reactor)

args = EventArguments(**kwargs)
for fn in self._subscribers[key]:
if reactor_thread == ReactorThread.NOT_RUNNING:
if not self.reactor.running:
fn(key, args)
else:
is_empty = bool(not self.queue)
Expand Down
Empty file added hathor/reactor/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions hathor/reactor/reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import cast

from twisted.internet import reactor as twisted_reactor
from twisted.internet.interfaces import IReactorCore, IReactorTCP, IReactorTime
from zope.interface.verify import verifyObject

from hathor.reactor.reactor_protocol import ReactorProtocol

assert verifyObject(IReactorTime, twisted_reactor) is True
assert verifyObject(IReactorCore, twisted_reactor) is True
assert verifyObject(IReactorTCP, twisted_reactor) is True

"""
This variable is the global reactor that should be imported to use the Twisted reactor.
It's cast to ReactorProtocol, our own type that stubs the necessary Twisted zope interfaces, to aid typing.
"""
reactor = cast(ReactorProtocol, twisted_reactor)
74 changes: 74 additions & 0 deletions hathor/reactor/reactor_core_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Callable, Optional, Protocol, Sequence

from twisted.internet.interfaces import IReactorCore
from zope.interface import implementer

if TYPE_CHECKING:
from twisted.internet.defer import Deferred


@implementer(IReactorCore)
class ReactorCoreProtocol(Protocol):
"""
A Python protocol that stubs Twisted's IReactorCore interface.
"""

running: bool

@abstractmethod
def resolve(self, name: str, timeout: Sequence[int]) -> 'Deferred[str]':
raise NotImplementedError

@abstractmethod
def run(self) -> None:
raise NotImplementedError

@abstractmethod
def stop(self) -> None:
raise NotImplementedError

@abstractmethod
def crash(self) -> None:
raise NotImplementedError

@abstractmethod
def iterate(self, delay: float) -> None:
raise NotImplementedError

@abstractmethod
def fireSystemEvent(self, eventType: str) -> None:
raise NotImplementedError

@abstractmethod
def addSystemEventTrigger(
self,
phase: str,
eventType: str,
callable: Callable[..., Any],
*args: object,
**kwargs: object,
) -> Any:
raise NotImplementedError

@abstractmethod
def removeSystemEventTrigger(self, triggerID: Any) -> None:
raise NotImplementedError

@abstractmethod
def callWhenRunning(self, callable: Callable[..., Any], *args: object, **kwargs: object) -> Optional[Any]:
raise NotImplementedError
31 changes: 31 additions & 0 deletions hathor/reactor/reactor_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Protocol

from hathor.reactor.reactor_core_protocol import ReactorCoreProtocol
from hathor.reactor.reactor_tcp_protocol import ReactorTCPProtocol
from hathor.reactor.reactor_time_protocol import ReactorTimeProtocol


class ReactorProtocol(
ReactorCoreProtocol,
ReactorTimeProtocol,
ReactorTCPProtocol,
Protocol,
):
"""
A Python protocol that represents the intersection of Twisted's IReactorCore+IReactorTime+IReactorTCP interfaces.
"""
pass
51 changes: 51 additions & 0 deletions hathor/reactor/reactor_tcp_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from typing import TYPE_CHECKING, Optional, Protocol

from twisted.internet.interfaces import IReactorTCP
from zope.interface import implementer

if TYPE_CHECKING:
from twisted.internet.interfaces import IConnector, IListeningPort
from twisted.internet.protocol import ClientFactory, ServerFactory


@implementer(IReactorTCP)
class ReactorTCPProtocol(Protocol):
"""
A Python protocol that stubs Twisted's IReactorTCP interface.
"""

@abstractmethod
def listenTCP(
self,
port: int,
factory: 'ServerFactory',
backlog: int = 0,
interface: str = ''
) -> 'IListeningPort':
raise NotImplementedError

@abstractmethod
def connectTCP(
self,
host: str,
port: int,
factory: 'ClientFactory',
timeout: float,
bindAddress: Optional[tuple[str, int]],
) -> 'IConnector':
raise NotImplementedError
Loading