Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def __init__(self) -> None:
self._force_memory_index: bool = False

self._event_manager: Optional[EventManager] = None
self._event_ws_factory: Optional[EventWebsocketFactory] = None
self._enable_event_queue: Optional[bool] = None

self._rocksdb_path: Optional[str] = None
Expand Down Expand Up @@ -374,11 +373,14 @@ def _get_or_create_event_storage(self) -> EventStorage:

def _get_or_create_event_manager(self) -> EventManager:
if self._event_manager is None:
reactor = self._get_reactor()
storage = self._get_or_create_event_storage()
factory = EventWebsocketFactory(reactor, storage)
self._event_manager = EventManager(
reactor=self._get_reactor(),
reactor=reactor,
pubsub=self._get_or_create_pubsub(),
event_storage=self._get_or_create_event_storage(),
event_ws_factory=self._event_ws_factory
event_storage=storage,
event_ws_factory=factory
)

return self._event_manager
Expand Down Expand Up @@ -460,10 +462,9 @@ def enable_wallet_index(self) -> 'Builder':
self.enable_tokens_index()
return self

def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder':
def enable_event_queue(self) -> 'Builder':
self.check_if_can_modify()
self._enable_event_queue = True
self._event_ws_factory = event_ws_factory
return self

def set_tx_storage(self, tx_storage: TransactionStorage) -> 'Builder':
Expand Down
50 changes: 33 additions & 17 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(
self._pubsub = pubsub

def start(self, peer_id: str) -> None:
"""Starts the EventManager."""
assert self._is_running is False, 'Cannot start, EventManager is already running'
assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set'
assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled'
Expand All @@ -95,40 +96,45 @@ def start(self, peer_id: str) -> None:
self._event_ws_factory.start()
self._is_running = True

def stop(self):
def stop(self) -> None:
"""Stops the EventManager."""
assert self._is_running is True, 'Cannot stop, EventManager is not running'
assert self._event_ws_factory is not None

self._event_ws_factory.stop()
self._is_running = False

def _assert_closed_event_group(self):
def _assert_closed_event_group(self) -> None:
# XXX: we must check that the last event either does not belong to an event group or that it just closed an
# event group, because we cannot resume an open group of events that wasn't properly closed before exit
assert (
self._event_group_is_closed()
), 'an unclosed event group was detected, which indicates the node crashed, cannot resume'
assert self._event_group_is_closed(), (
'an unclosed event group was detected, which indicates the node crashed, cannot resume'
)

def _event_group_is_closed(self):
def _event_group_is_closed(self) -> bool:
"""Returns whether the previous event group was properly closed, if there's one."""
return (
self._last_event is None or
self._last_event.group_id is None or
EventType(self._last_event.type) in _GROUP_END_EVENTS
)

def _subscribe_events(self):
def _subscribe_events(self) -> None:
""" Subscribe to defined events for the pubsub received
"""
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_hathor_event)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
"""Handles a PubSub 'HathorEvents' event."""
event_type = EventType.from_hathor_event(hathor_event)

self._handle_event(event_type, event_args)

def _handle_event(self, event_type: EventType, event_args: EventArguments) -> None:
"""Handles an Event Queue feature 'EventType' event."""
assert self._is_running, 'Cannot handle event, EventManager is not started.'
assert self._event_ws_factory is not None

event_specific_handlers = {
EventType.LOAD_STARTED: self._handle_load_started,
Expand All @@ -138,10 +144,15 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No
if event_specific_handler := event_specific_handlers.get(event_type):
event_specific_handler()

self._handle_event_creation(event_type, event_args)
event = self._handle_event_creation(event_type, event_args)

def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> None:
assert self._event_ws_factory is not None
self._event_storage.save_event(event)
self._event_ws_factory.broadcast_event(event)

self._last_event = event

def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> BaseEvent:
"""Handles the creation of an event from PubSub's EventArguments, according to its EventType."""
create_event_fn: Callable[[EventType, EventArguments], BaseEvent]

if event_type in _GROUP_START_EVENTS:
Expand All @@ -153,12 +164,10 @@ def _handle_event_creation(self, event_type: EventType, event_args: EventArgumen

event = create_event_fn(event_type, event_args)

self._event_storage.save_event(event)
self._event_ws_factory.broadcast_event(event)

self._last_event = event
return event

def _create_group_start_event(self, event_type: EventType, event_args: EventArguments) -> BaseEvent:
"""Creates a group start event."""
assert self._event_group_is_closed(), 'A new event group cannot be started as one is already in progress.'

new_group_id = 0 if self._last_existing_group_id is None else self._last_existing_group_id + 1
Expand All @@ -172,6 +181,7 @@ def _create_group_start_event(self, event_type: EventType, event_args: EventArgu
)

def _create_group_end_event(self, event_type: EventType, event_args: EventArguments) -> BaseEvent:
"""Creates a group end event."""
assert self._last_event is not None, 'Cannot end event group if there are no events.'
assert not self._event_group_is_closed(), 'Cannot end event group as none is in progress.'

Expand All @@ -182,6 +192,7 @@ def _create_group_end_event(self, event_type: EventType, event_args: EventArgume
)

def _create_non_group_edge_event(self, event_type: EventType, event_args: EventArguments) -> BaseEvent:
"""Creates an event that neither a start nor an end event."""
group_id = None

if not self._event_group_is_closed():
Expand All @@ -200,6 +211,7 @@ def _create_event(
event_args: EventArguments,
group_id: Optional[int],
) -> BaseEvent:
"""Actually creates a BaseEvent."""
return BaseEvent.from_event_arguments(
event_id=0 if self._last_event is None else self._last_event.id + 1,
peer_id=self._peer_id,
Expand All @@ -209,20 +221,24 @@ def _create_event(
group_id=group_id,
)

def _handle_load_started(self):
def _handle_load_started(self) -> None:
"""Event specific handler for EventType.LOAD_STARTED."""
self._event_storage.save_node_state(NodeState.LOAD)

def _handle_load_finished(self):
def _handle_load_finished(self) -> None:
"""Event specific handler for EventType.LOAD_FINISHED."""
self._event_storage.save_node_state(NodeState.SYNC)

def _should_reload_events(self) -> bool:
"""Returns whether events should be reloaded or not."""
return self._previous_node_state in [None, NodeState.LOAD]

def get_event_queue_state(self) -> bool:
"""Get whether the event queue feature is enabled from the storage"""
"""Get whether the event queue feature is enabled from the storage."""
return self._event_storage.get_event_queue_state()

def save_event_queue_state(self, state: bool) -> None:
"""Saves whether the event queue feature is enabled from the storage."""
self._event_storage.save_event_queue_state(state)

def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None:
Expand Down
1 change: 1 addition & 0 deletions hathor/event/model/base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def from_event_arguments(
event_args: EventArguments,
group_id: Optional[NonNegativeInt]
) -> 'BaseEvent':
"""Creates a BaseEvent from PubSub's EventArguments."""
event_data_type = event_type.data_type()

return cls(
Expand Down
20 changes: 13 additions & 7 deletions hathor/event/model/event_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,34 @@ def _parse_spent_outputs(cls, spent_output: Union[SpentOutput, list[Union[int, l
>>> TxMetadata._parse_spent_outputs([0, ['tx1', 'tx2']])
SpentOutput(index=0, tx_ids=['tx1', 'tx2'])
"""
if isinstance(spent_output, SpentOutput):
return spent_output
if isinstance(spent_output, list):
index, tx_ids = spent_output

index, tx_ids = spent_output
return SpentOutput(
index=cast(int, index),
tx_ids=cast(list[str], tx_ids)
)

return SpentOutput(
index=cast(int, index),
tx_ids=cast(list[str], tx_ids)
)
return spent_output


class BaseEventData(BaseModel):
"""Base class for event data polymorphism."""
@classmethod
def from_event_arguments(cls, args: EventArguments) -> 'EventData':
"""Returns an instance of this class by processing PubSub's EventArguments."""
raise NotImplementedError()


class EmptyData(BaseEventData):
"""Class that represents empty data on an event."""
@classmethod
def from_event_arguments(cls, args: EventArguments) -> 'EmptyData':
return cls()


class TxData(BaseEventData, extra=Extra.ignore):
"""Class that represents transaction data on an event."""
hash: str
nonce: Optional[int] = None
timestamp: int
Expand All @@ -110,6 +114,7 @@ def from_event_arguments(cls, args: EventArguments) -> 'TxData':


class ReorgData(BaseEventData):
"""Class that represents reorg data on an event."""
reorg_size: int
previous_best_block: str
new_best_block: str
Expand All @@ -125,4 +130,5 @@ def from_event_arguments(cls, args: EventArguments) -> 'ReorgData':
)


# Union type to encompass BaseEventData polymorphism
EventData = Union[EmptyData, TxData, ReorgData]
1 change: 1 addition & 0 deletions hathor/event/model/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class EventType(Enum):

@classmethod
def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType':
"""Create an Event Queue feature EventType from a PubSub HathorEvents."""
event = _HATHOR_EVENT_TO_EVENT_TYPE.get(hathor_event)

assert event is not None, f'Cannot create EventType from {hathor_event}'
Expand Down
12 changes: 10 additions & 2 deletions hathor/event/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@


class EventWebsocketFactory(WebSocketServerFactory):
""" Websocket that will handle events
"""
"""WebSocket factory that handles the broadcasting of the Event Queue feature."""

protocol = EventWebsocketProtocol

# Whether the factory is running or not.
_is_running = False

# The last event id broadcast by this factory.
_latest_event_id: Optional[int] = None

def __init__(self, reactor: Reactor, event_storage: EventStorage):
Expand Down Expand Up @@ -85,6 +88,10 @@ def unregister(self, connection: EventWebsocketProtocol) -> None:
self._connections.discard(connection)

def send_next_event_to_connection(self, connection: EventWebsocketProtocol) -> None:
"""
Sends the next expected event to a connection, if it can receive the next event, and the event exists.
Will recurse asynchronously trying to send new events to the connection until it cannot receive more events.
"""
next_event_id = connection.next_expected_event_id()

if not connection.can_receive_event(next_event_id):
Expand All @@ -95,6 +102,7 @@ def send_next_event_to_connection(self, connection: EventWebsocketProtocol) -> N
self._reactor.callLater(0, self.send_next_event_to_connection, connection)

def _send_event_to_connection(self, connection: EventWebsocketProtocol, event: BaseEvent) -> None:
"""Sends an event to a connection, if it can receive this event."""
if not connection.can_receive_event(event.id):
return

Expand Down
Loading