Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a83773d
feat(events): remove --x-emit-load-events CLI flag
glevco Apr 13, 2023
2546b88
feat(events): implement node state related methods on EventStorage
glevco Apr 13, 2023
a2d046e
feat(events): implement load phase related methods on EventManager
glevco Apr 13, 2023
ec1eea6
feat(events): implement load phase event handling on HathorManager
glevco Apr 13, 2023
6ae1b3b
feat(events): update tests
glevco Apr 13, 2023
b8b3245
feat(events): implement event storage tests for new methods
glevco Apr 14, 2023
2a77957
feat(events): fix rocksdb event storage bugs
glevco Apr 14, 2023
8fb4f04
feat(events): fix bug on load phase events
glevco Apr 14, 2023
71bc1bd
feat(events): remove TODOs
glevco Apr 17, 2023
b9914c9
feat(events): lint code and improve method name
glevco Apr 17, 2023
5e86209
feat(events): add missing docstring and other minor changes
glevco Apr 18, 2023
f503b41
feat(events): lint code
glevco Apr 18, 2023
1201f53
feat(events): fix bug in load phase events
glevco Apr 19, 2023
93c7d3a
feat(events): remove wrong docstring
glevco Apr 19, 2023
2ae0d54
feat(events): remove emit-load-events tests from test_cli_builder
glevco Apr 20, 2023
faa2754
feat(events): implement event queue feature state methods in event st…
glevco Apr 26, 2023
8675e4c
feat(events): set event_storage on HathorManager
glevco Apr 26, 2023
1e787fe
feat(events): reorganize rebased code
glevco Apr 26, 2023
068b766
chore(builder): fix tests
glevco Apr 26, 2023
c5227cd
feat(events): reorganize event manager and storage initialization
glevco May 2, 2023
75f3d66
fix rebase conflicts
glevco May 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self) -> None:

self._event_manager: Optional[EventManager] = None
self._event_ws_factory: Optional[EventWebsocketFactory] = None
self._enable_event_queue: Optional[bool] = None

self._rocksdb_path: Optional[str] = None
self._rocksdb_storage: Optional[RocksDBStorage] = None
Expand Down Expand Up @@ -128,7 +129,6 @@ def build(self) -> BuildArtifacts:
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)

wallet = self._get_or_create_wallet()
event_storage = self._get_or_create_event_storage()
event_manager = self._get_or_create_event_manager()
tx_storage = self._get_or_create_tx_storage()
indexes = tx_storage.indexes
Expand Down Expand Up @@ -161,20 +161,22 @@ def build(self) -> BuildArtifacts:
if self._full_verification is not None:
kwargs['full_verification'] = self._full_verification

if self._enable_event_queue is not None:
kwargs['enable_event_queue'] = self._enable_event_queue

manager = HathorManager(
reactor,
pubsub=pubsub,
consensus_algorithm=consensus_algorithm,
peer_id=peer_id,
tx_storage=tx_storage,
event_storage=event_storage,
event_manager=event_manager,
network=self._network,
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
environment_info=get_environment_info(self._cmdline, peer_id.id),
event_manager=event_manager,
**kwargs
)

Expand Down Expand Up @@ -303,8 +305,8 @@ def _get_or_create_event_storage(self) -> EventStorage:

return self._event_storage

def _get_or_create_event_manager(self) -> Optional[EventManager]:
if self._event_manager is None and self._event_ws_factory is not None:
def _get_or_create_event_manager(self) -> EventManager:
if self._event_manager is None:
self._event_manager = EventManager(
reactor=self._get_reactor(),
pubsub=self._get_or_create_pubsub(),
Expand Down Expand Up @@ -389,6 +391,7 @@ def enable_wallet_index(self) -> 'Builder':

def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder':
self.check_if_can_modify()
self._enable_event_queue = True
self._event_ws_factory = event_ws_factory
return self

Expand Down
24 changes: 10 additions & 14 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa

pubsub = PubSubManager(reactor)

event_manager: Optional[EventManager] = None
if args.x_enable_event_queue:
self.event_ws_factory = EventWebsocketFactory(reactor, event_storage)
event_manager = EventManager(
event_storage=event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=reactor,
emit_load_events=args.x_emit_load_events
)
else:
self.check_or_raise(not args.x_emit_load_events, '--x-emit-load-events cannot be used without '
'--x-enable-event-queue')

event_manager = EventManager(
event_storage=event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=reactor
)

if args.wallet_index and tx_storage.indexes is not None:
self.log.debug('enable wallet indexes')
Expand All @@ -174,7 +170,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
network=network,
hostname=hostname,
tx_storage=tx_storage,
event_storage=event_storage,
event_manager=event_manager,
wallet=self.wallet,
stratum_port=args.stratum,
Expand All @@ -185,7 +180,8 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
enable_sync_v2=enable_sync_v2,
consensus_algorithm=consensus_algorithm,
environment_info=get_environment_info(args=str(args), peer_id=peer_id.id),
full_verification=full_verification
full_verification=full_verification,
enable_event_queue=bool(args.x_enable_event_queue)
)

if args.data:
Expand Down Expand Up @@ -232,7 +228,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa

self.manager.enable_event_queue = True
self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and retrieved to clients')
'The events detected by the full node will be stored and can be retrieved by clients')

for description in args.listen:
self.manager.add_listen_address(description)
Expand Down
2 changes: 1 addition & 1 deletion hathor/builder/resources_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def create_resources(self, args: Namespace) -> server.Site:
ws_factory.subscribe(self.manager.pubsub)

# Event websocket resource
if args.x_enable_event_queue and self.event_ws_factory is not None:
if args.x_enable_event_queue:
root.putChild(b'event_ws', WebSocketResource(self.event_ws_factory))
root.putChild(b'event', EventResource(self.manager._event_manager))

Expand Down
2 changes: 0 additions & 2 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ def create_parser(cls) -> ArgumentParser:
parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost')
parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS)
parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism')
parser.add_argument('--x-emit-load-events', action='store_true', help='Enable emission of events during the '
'LOAD phase')
parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str,
help='Peer IDs to forbid connection')
return parser
Expand Down
86 changes: 65 additions & 21 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Optional
from typing import Callable, Iterator, Optional

from structlog import get_logger

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.event_type import EventType
from hathor.event.model.node_state import NodeState
from hathor.event.storage import EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
from hathor.util import Reactor

logger = get_logger()
Expand Down Expand Up @@ -51,7 +53,9 @@ class EventManager:

_peer_id: str
_is_running: bool = False
_load_finished: bool = False
_previous_node_state: Optional[NodeState] = None
_last_event: Optional[BaseEvent] = None
_last_existing_group_id: Optional[int] = None

@property
def event_storage(self) -> EventStorage:
Expand All @@ -60,34 +64,40 @@ def event_storage(self) -> EventStorage:
def __init__(
self,
event_storage: EventStorage,
event_ws_factory: EventWebsocketFactory,
pubsub: PubSubManager,
reactor: Reactor,
emit_load_events: bool = False
event_ws_factory: Optional[EventWebsocketFactory] = None,
):
self.log = logger.new()

self._clock = reactor
self._reactor = reactor
self._event_storage = event_storage
self._event_ws_factory = event_ws_factory
self._pubsub = pubsub
self.emit_load_events = emit_load_events

self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()
def start(self, peer_id: str) -> None:
assert self._is_running is False, 'Cannot start, EventManager is already running'
assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set'
assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled'

self._previous_node_state = self._event_storage.get_node_state()

if self._should_reload_events():
self._event_storage.clear_events()
else:
self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()

self._assert_closed_event_group()
self._subscribe_events()

def start(self, peer_id: str) -> None:
assert self._is_running is False, 'Cannot start, EventManager is already running'

self._peer_id = peer_id
self._event_ws_factory.start()
self._is_running = True

def stop(self):
assert self._is_running is True, 'Cannot stop, EventManager is not running'
assert self._event_ws_factory is not None

self._event_ws_factory.stop()
self._is_running = False
Expand All @@ -110,25 +120,28 @@ def _subscribe_events(self):
""" Subscribe to defined events for the pubsub received
"""
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_event)
self._pubsub.subscribe(event, self._handle_hathor_event)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
event_type = EventType.from_hathor_event(hathor_event)

self._handle_event(event_type, event_args)

def _handle_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
def _handle_event(self, event_type: EventType, event_args: EventArguments) -> None:
assert self._is_running, 'Cannot handle event, EventManager is not started.'

event_type = EventType.from_hathor_event(hathor_event)
event_specific_handlers = {
EventType.LOAD_STARTED: self._handle_load_started,
EventType.LOAD_FINISHED: self._handle_load_finished
}

if event_specific_handler := event_specific_handlers.get(event_type):
event_specific_handler()

if not self._load_finished and not self.emit_load_events:
return

self._handle_event_creation(event_type, event_args)

def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> None:
assert self._event_ws_factory is not None
create_event_fn: Callable[[EventType, EventArguments], BaseEvent]

if event_type in _GROUP_START_EVENTS:
Expand Down Expand Up @@ -181,9 +194,6 @@ def _create_non_group_edge_event(self, event_type: EventType, event_args: EventA
group_id=group_id,
)

def _handle_load_finished(self):
self._load_finished = True

def _create_event(
self,
event_type: EventType,
Expand All @@ -193,8 +203,42 @@ def _create_event(
return BaseEvent.from_event_arguments(
event_id=0 if self._last_event is None else self._last_event.id + 1,
peer_id=self._peer_id,
timestamp=self._clock.seconds(),
timestamp=self._reactor.seconds(),
event_type=event_type,
event_args=event_args,
group_id=group_id,
)

def _handle_load_started(self):
self._event_storage.save_node_state(NodeState.LOAD)

def _handle_load_finished(self):
self._event_storage.save_node_state(NodeState.SYNC)

def _should_reload_events(self) -> bool:
return self._previous_node_state in [None, NodeState.LOAD]

def get_event_queue_state(self) -> bool:
"""Get whether the event queue feature is enabled from the storage"""
return self._event_storage.get_event_queue_state()

def save_event_queue_state(self, state: bool) -> None:
self._event_storage.save_event_queue_state(state)

def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None:
"""
Either generates load phase events or not, depending on previous node state.
Does so asynchronously so events generated here are not processed before normal event handling.
"""
assert self._is_running, 'Cannot handle load phase events, EventManager is not started.'

if not self._should_reload_events():
return

for vertex in topological_iterator:
self._reactor.callLater(
delay=0,
callable=self._handle_event,
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)
20 changes: 20 additions & 0 deletions hathor/event/model/node_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum


class NodeState(Enum):
LOAD = 0
SYNC = 1
26 changes: 26 additions & 0 deletions hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState


class EventStorage(ABC):
Expand Down Expand Up @@ -43,3 +44,28 @@ def get_last_group_id(self) -> Optional[int]:
def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
""" Iterate through events starting from the event with the given key"""
raise NotImplementedError

@abstractmethod
def clear_events(self) -> None:
"""Clear all stored events and related metadata."""
raise NotImplementedError

@abstractmethod
def save_node_state(self, state: NodeState) -> None:
"""Save a node state in the storage"""
raise NotImplementedError

@abstractmethod
def get_node_state(self) -> Optional[NodeState]:
"""Get the node state from the storage"""
raise NotImplementedError

@abstractmethod
def save_event_queue_state(self, enabled: bool) -> None:
"""Save whether the event queue feature is enabled in the storage"""
raise NotImplementedError

@abstractmethod
def get_event_queue_state(self) -> bool:
"""Get whether the event queue feature is enabled from the storage"""
raise NotImplementedError
20 changes: 20 additions & 0 deletions hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Iterator, List, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
from hathor.event.storage.event_storage import EventStorage


Expand All @@ -23,6 +24,8 @@ def __init__(self) -> None:
self._events: List[BaseEvent] = []
self._last_event: Optional[BaseEvent] = None
self._last_group_id: Optional[int] = None
self._node_state: Optional[NodeState] = None
self._event_queue_enabled: bool = False

def save_event(self, event: BaseEvent) -> None:
if event.id != len(self._events):
Expand Down Expand Up @@ -54,3 +57,20 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
while key < len(self._events):
yield self._events[key]
key += 1

def clear_events(self) -> None:
self._events = []
self._last_event = None
self._last_group_id = None

def save_node_state(self, state: NodeState) -> None:
self._node_state = state

def get_node_state(self) -> Optional[NodeState]:
return self._node_state

def save_event_queue_state(self, enabled: bool) -> None:
self._event_queue_enabled = enabled

def get_event_queue_state(self) -> bool:
return self._event_queue_enabled
Loading