diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index c3daad189..9ef90dac8 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -93,7 +93,6 @@ def __init__(self) -> None: self._force_memory_index: bool = False self._event_manager: Optional[EventManager] = None - self._event_ws_factory: Optional[EventWebsocketFactory] = None self._enable_event_queue: Optional[bool] = None self._rocksdb_path: Optional[str] = None @@ -374,11 +373,14 @@ def _get_or_create_event_storage(self) -> EventStorage: def _get_or_create_event_manager(self) -> EventManager: if self._event_manager is None: + reactor = self._get_reactor() + storage = self._get_or_create_event_storage() + factory = EventWebsocketFactory(reactor, storage) self._event_manager = EventManager( - reactor=self._get_reactor(), + reactor=reactor, pubsub=self._get_or_create_pubsub(), - event_storage=self._get_or_create_event_storage(), - event_ws_factory=self._event_ws_factory + event_storage=storage, + event_ws_factory=factory ) return self._event_manager @@ -460,10 +462,9 @@ def enable_wallet_index(self) -> 'Builder': self.enable_tokens_index() return self - def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder': + def enable_event_queue(self) -> 'Builder': self.check_if_can_modify() self._enable_event_queue = True - self._event_ws_factory = event_ws_factory return self def set_tx_storage(self, tx_storage: TransactionStorage) -> 'Builder': diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 2d50f04c3..08716728c 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -76,6 +76,7 @@ def __init__( self._pubsub = pubsub def start(self, peer_id: str) -> None: + """Starts the EventManager.""" assert self._is_running is False, 'Cannot start, EventManager is already running' assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set' assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled' @@ -95,40 +96,45 @@ def start(self, peer_id: str) -> None: self._event_ws_factory.start() self._is_running = True - def stop(self): + def stop(self) -> None: + """Stops the EventManager.""" assert self._is_running is True, 'Cannot stop, EventManager is not running' assert self._event_ws_factory is not None self._event_ws_factory.stop() self._is_running = False - def _assert_closed_event_group(self): + def _assert_closed_event_group(self) -> None: # XXX: we must check that the last event either does not belong to an event group or that it just closed an # event group, because we cannot resume an open group of events that wasn't properly closed before exit - assert ( - self._event_group_is_closed() - ), 'an unclosed event group was detected, which indicates the node crashed, cannot resume' + assert self._event_group_is_closed(), ( + 'an unclosed event group was detected, which indicates the node crashed, cannot resume' + ) - def _event_group_is_closed(self): + def _event_group_is_closed(self) -> bool: + """Returns whether the previous event group was properly closed, if there's one.""" return ( self._last_event is None or self._last_event.group_id is None or EventType(self._last_event.type) in _GROUP_END_EVENTS ) - def _subscribe_events(self): + def _subscribe_events(self) -> None: """ Subscribe to defined events for the pubsub received """ for event in _SUBSCRIBE_EVENTS: self._pubsub.subscribe(event, self._handle_hathor_event) def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: + """Handles a PubSub 'HathorEvents' event.""" event_type = EventType.from_hathor_event(hathor_event) self._handle_event(event_type, event_args) def _handle_event(self, event_type: EventType, event_args: EventArguments) -> None: + """Handles an Event Queue feature 'EventType' event.""" assert self._is_running, 'Cannot handle event, EventManager is not started.' + assert self._event_ws_factory is not None event_specific_handlers = { EventType.LOAD_STARTED: self._handle_load_started, @@ -138,10 +144,15 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No if event_specific_handler := event_specific_handlers.get(event_type): event_specific_handler() - self._handle_event_creation(event_type, event_args) + event = self._handle_event_creation(event_type, event_args) - def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> None: - assert self._event_ws_factory is not None + self._event_storage.save_event(event) + self._event_ws_factory.broadcast_event(event) + + self._last_event = event + + def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> BaseEvent: + """Handles the creation of an event from PubSub's EventArguments, according to its EventType.""" create_event_fn: Callable[[EventType, EventArguments], BaseEvent] if event_type in _GROUP_START_EVENTS: @@ -153,12 +164,10 @@ def _handle_event_creation(self, event_type: EventType, event_args: EventArgumen event = create_event_fn(event_type, event_args) - self._event_storage.save_event(event) - self._event_ws_factory.broadcast_event(event) - - self._last_event = event + return event def _create_group_start_event(self, event_type: EventType, event_args: EventArguments) -> BaseEvent: + """Creates a group start event.""" assert self._event_group_is_closed(), 'A new event group cannot be started as one is already in progress.' new_group_id = 0 if self._last_existing_group_id is None else self._last_existing_group_id + 1 @@ -172,6 +181,7 @@ def _create_group_start_event(self, event_type: EventType, event_args: EventArgu ) def _create_group_end_event(self, event_type: EventType, event_args: EventArguments) -> BaseEvent: + """Creates a group end event.""" assert self._last_event is not None, 'Cannot end event group if there are no events.' assert not self._event_group_is_closed(), 'Cannot end event group as none is in progress.' @@ -182,6 +192,7 @@ def _create_group_end_event(self, event_type: EventType, event_args: EventArgume ) def _create_non_group_edge_event(self, event_type: EventType, event_args: EventArguments) -> BaseEvent: + """Creates an event that neither a start nor an end event.""" group_id = None if not self._event_group_is_closed(): @@ -200,6 +211,7 @@ def _create_event( event_args: EventArguments, group_id: Optional[int], ) -> BaseEvent: + """Actually creates a BaseEvent.""" return BaseEvent.from_event_arguments( event_id=0 if self._last_event is None else self._last_event.id + 1, peer_id=self._peer_id, @@ -209,20 +221,24 @@ def _create_event( group_id=group_id, ) - def _handle_load_started(self): + def _handle_load_started(self) -> None: + """Event specific handler for EventType.LOAD_STARTED.""" self._event_storage.save_node_state(NodeState.LOAD) - def _handle_load_finished(self): + def _handle_load_finished(self) -> None: + """Event specific handler for EventType.LOAD_FINISHED.""" self._event_storage.save_node_state(NodeState.SYNC) def _should_reload_events(self) -> bool: + """Returns whether events should be reloaded or not.""" return self._previous_node_state in [None, NodeState.LOAD] def get_event_queue_state(self) -> bool: - """Get whether the event queue feature is enabled from the storage""" + """Get whether the event queue feature is enabled from the storage.""" return self._event_storage.get_event_queue_state() def save_event_queue_state(self, state: bool) -> None: + """Saves whether the event queue feature is enabled from the storage.""" self._event_storage.save_event_queue_state(state) def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None: diff --git a/hathor/event/model/base_event.py b/hathor/event/model/base_event.py index 9c1e0c6b4..c64700fba 100644 --- a/hathor/event/model/base_event.py +++ b/hathor/event/model/base_event.py @@ -49,6 +49,7 @@ def from_event_arguments( event_args: EventArguments, group_id: Optional[NonNegativeInt] ) -> 'BaseEvent': + """Creates a BaseEvent from PubSub's EventArguments.""" event_data_type = event_type.data_type() return cls( diff --git a/hathor/event/model/event_data.py b/hathor/event/model/event_data.py index 7ab905473..343b445d2 100644 --- a/hathor/event/model/event_data.py +++ b/hathor/event/model/event_data.py @@ -63,30 +63,34 @@ def _parse_spent_outputs(cls, spent_output: Union[SpentOutput, list[Union[int, l >>> TxMetadata._parse_spent_outputs([0, ['tx1', 'tx2']]) SpentOutput(index=0, tx_ids=['tx1', 'tx2']) """ - if isinstance(spent_output, SpentOutput): - return spent_output + if isinstance(spent_output, list): + index, tx_ids = spent_output - index, tx_ids = spent_output + return SpentOutput( + index=cast(int, index), + tx_ids=cast(list[str], tx_ids) + ) - return SpentOutput( - index=cast(int, index), - tx_ids=cast(list[str], tx_ids) - ) + return spent_output class BaseEventData(BaseModel): + """Base class for event data polymorphism.""" @classmethod def from_event_arguments(cls, args: EventArguments) -> 'EventData': + """Returns an instance of this class by processing PubSub's EventArguments.""" raise NotImplementedError() class EmptyData(BaseEventData): + """Class that represents empty data on an event.""" @classmethod def from_event_arguments(cls, args: EventArguments) -> 'EmptyData': return cls() class TxData(BaseEventData, extra=Extra.ignore): + """Class that represents transaction data on an event.""" hash: str nonce: Optional[int] = None timestamp: int @@ -110,6 +114,7 @@ def from_event_arguments(cls, args: EventArguments) -> 'TxData': class ReorgData(BaseEventData): + """Class that represents reorg data on an event.""" reorg_size: int previous_best_block: str new_best_block: str @@ -125,4 +130,5 @@ def from_event_arguments(cls, args: EventArguments) -> 'ReorgData': ) +# Union type to encompass BaseEventData polymorphism EventData = Union[EmptyData, TxData, ReorgData] diff --git a/hathor/event/model/event_type.py b/hathor/event/model/event_type.py index d6f2bc8c4..59aa20f3e 100644 --- a/hathor/event/model/event_type.py +++ b/hathor/event/model/event_type.py @@ -28,6 +28,7 @@ class EventType(Enum): @classmethod def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType': + """Create an Event Queue feature EventType from a PubSub HathorEvents.""" event = _HATHOR_EVENT_TO_EVENT_TYPE.get(hathor_event) assert event is not None, f'Cannot create EventType from {hathor_event}' diff --git a/hathor/event/websocket/factory.py b/hathor/event/websocket/factory.py index 41b19dd14..6f7a8b1f5 100644 --- a/hathor/event/websocket/factory.py +++ b/hathor/event/websocket/factory.py @@ -27,11 +27,14 @@ class EventWebsocketFactory(WebSocketServerFactory): - """ Websocket that will handle events - """ + """WebSocket factory that handles the broadcasting of the Event Queue feature.""" protocol = EventWebsocketProtocol + + # Whether the factory is running or not. _is_running = False + + # The last event id broadcast by this factory. _latest_event_id: Optional[int] = None def __init__(self, reactor: Reactor, event_storage: EventStorage): @@ -85,6 +88,10 @@ def unregister(self, connection: EventWebsocketProtocol) -> None: self._connections.discard(connection) def send_next_event_to_connection(self, connection: EventWebsocketProtocol) -> None: + """ + Sends the next expected event to a connection, if it can receive the next event, and the event exists. + Will recurse asynchronously trying to send new events to the connection until it cannot receive more events. + """ next_event_id = connection.next_expected_event_id() if not connection.can_receive_event(next_event_id): @@ -95,6 +102,7 @@ def send_next_event_to_connection(self, connection: EventWebsocketProtocol) -> N self._reactor.callLater(0, self.send_next_event_to_connection, connection) def _send_event_to_connection(self, connection: EventWebsocketProtocol, event: BaseEvent) -> None: + """Sends an event to a connection, if it can receive this event.""" if not connection.can_receive_event(event.id): return diff --git a/hathor/event/websocket/protocol.py b/hathor/event/websocket/protocol.py index 5a5906ec2..102617546 100644 --- a/hathor/event/websocket/protocol.py +++ b/hathor/event/websocket/protocol.py @@ -31,15 +31,23 @@ class EventWebsocketProtocol(WebSocketServerProtocol): - """ Websocket protocol, basically forwards some events to the Websocket factory. - """ + """WebSocket protocol that handles Event Queue feature commands.""" factory: 'EventWebsocketFactory' + + # The peer connected to this connection. client_peer: Optional[str] = None + # The last event id that was sent to this connection. _last_sent_event_id: Optional[int] = None + + # The last event id that was acknowledged by this connection. _ack_event_id: Optional[int] = None + + # The amount of events this connection can process. Essentially, its flux control. _window_size: int = 0 + + # Whether the stream is enabled or not. _stream_is_active: bool = False def __init__(self): @@ -47,7 +55,11 @@ def __init__(self): self.log = logger.new() def can_receive_event(self, event_id: int) -> bool: - """Returns whether this client is available to receive an event.""" + """ + Returns whether this client is available to receive an event. + Only the next expected event can be sent, if the stream is active. Also, there needs to be more slots in the + configured window than events that were sent but not acknowledged yet. + """ number_of_pending_events = 0 if self._last_sent_event_id is not None: @@ -89,6 +101,7 @@ def onMessage(self, payload: bytes, isBinary: bool) -> None: self.send_invalid_request_response(error.type, payload) def _handle_request(self, request: Request) -> None: + """Handles a request message according to its type.""" # This could be a pattern match in Python 3.10 request_type = type(request) handlers: dict[type, Callable] = { @@ -103,43 +116,49 @@ def _handle_request(self, request: Request) -> None: handle_fn(request) def _handle_start_stream_request(self, request: StartStreamRequest) -> None: + """ + Handles a StartStreamRequest message. + Sets all required state attributes and triggers the factory's recursion to send events while it's possible. + """ if self._stream_is_active: raise InvalidRequestError(InvalidRequestType.STREAM_IS_ACTIVE) - self._validate_ack(request.last_ack_event_id) - self._last_sent_event_id = request.last_ack_event_id - self._ack_event_id = request.last_ack_event_id + self._update_ack(request.last_ack_event_id) self._window_size = request.window_size self._stream_is_active = True self.factory.send_next_event_to_connection(self) def _handle_ack_request(self, request: AckRequest) -> None: + """ + Handles an AckRequest message. + Updates state attributes and triggers the factory's recursion to send events while it's possible + """ if not self._stream_is_active: raise InvalidRequestError(InvalidRequestType.STREAM_IS_INACTIVE) - self._validate_ack(request.ack_event_id) - - self._ack_event_id = request.ack_event_id + self._update_ack(request.ack_event_id) + self._last_sent_event_id = request.ack_event_id self._window_size = request.window_size self.factory.send_next_event_to_connection(self) def _handle_stop_stream_request(self) -> None: + """Handles a StopStreamRequest message.""" if not self._stream_is_active: raise InvalidRequestError(InvalidRequestType.STREAM_IS_INACTIVE) self._stream_is_active = False - def _validate_ack(self, ack_event_id: Optional[int]) -> None: - """Validates an ack_event_id from a request. + def _update_ack(self, ack_event_id: Optional[int]) -> None: + """Update the _ack_event_id if the new one is valid. - The ack_event_id can't be smaller than the last ack we've received - and can't be larger than the last event we've sent. + The ack_event_id must be greater than the last ack we've received, + and can't be greater than the last event we've sent. """ if self._ack_event_id is not None and ( - ack_event_id is None or ack_event_id < self._ack_event_id + ack_event_id is None or ack_event_id <= self._ack_event_id ): raise InvalidRequestError(InvalidRequestType.ACK_TOO_SMALL) @@ -148,7 +167,10 @@ def _validate_ack(self, ack_event_id: Optional[int]) -> None: ): raise InvalidRequestError(InvalidRequestType.ACK_TOO_LARGE) + self._ack_event_id = ack_event_id + def send_event_response(self, event_response: EventResponse) -> None: + """Send an EventResponse to this connection.""" self._send_response(event_response) self._last_sent_event_id = event_response.event.id @@ -158,6 +180,7 @@ def send_invalid_request_response( invalid_payload: Optional[bytes] = None, error_message: Optional[str] = None ) -> None: + """Send an InvalidRequestResponse to this connection.""" invalid_request = None if invalid_payload is None else invalid_payload.decode('utf8') response = InvalidRequestResponse( type=_type, @@ -168,6 +191,7 @@ def send_invalid_request_response( self._send_response(response) def _send_response(self, response: Response) -> None: + """Actually sends a response to this connection.""" payload = json_dumpb(response.dict()) try: diff --git a/tests/event/event_simulation_tester.py b/tests/event/event_simulation_tester.py new file mode 100644 index 000000000..31d3c0c11 --- /dev/null +++ b/tests/event/event_simulation_tester.py @@ -0,0 +1,113 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from json import JSONDecodeError +from typing import Any, Iterable +from unittest.mock import Mock + +import pytest +from twisted.internet.testing import StringTransport + +from hathor.builder import Builder +from hathor.event.websocket import EventWebsocketProtocol +from hathor.event.websocket.request import Request +from hathor.event.websocket.response import EventResponse, InvalidRequestResponse +from hathor.p2p.peer_id import PeerId +from hathor.transaction.util import unpack, unpack_len +from hathor.util import json_loadb +from tests.simulation.base import SimulatorTestCase +from tests.utils import HAS_ROCKSDB + + +class BaseEventSimulationTester(SimulatorTestCase): + builder: Builder + + def _create_artifacts(self) -> None: + peer_id = PeerId() + builder = self.builder.set_peer_id(peer_id) \ + .disable_full_verification() \ + .enable_event_queue() + artifacts = self.simulator.create_artifacts(builder) + + self.peer_id = peer_id.id + self.manager = artifacts.manager + self.manager.allow_mining_without_peers() + self.settings = artifacts.settings + + event_ws_factory = self.manager._event_manager._event_ws_factory + event_ws_factory.openHandshakeTimeout = 0 + + self.protocol = event_ws_factory.buildProtocol(addr=Mock()) + self.transport = StringTransport() + self.protocol.makeConnection(self.transport) + self.protocol.state = EventWebsocketProtocol.STATE_OPEN + self.protocol.onOpen() + + def _send_request(self, request: Request) -> None: + self.protocol.onMessage( + payload=request.json_dumpb(), + isBinary=False + ) + + def _get_success_responses(self) -> list[EventResponse]: + return list(map(EventResponse.parse_obj, self._get_transport_messages())) + + def _get_error_response(self) -> InvalidRequestResponse: + responses = self._get_transport_messages() + assert len(responses) == 1 + return InvalidRequestResponse.parse_obj(responses[0]) + + def _get_transport_messages(self) -> list[dict[str, Any]]: + values = self.transport.value() + result = self._decode_values(values) + + self.transport.clear() + + return list(result) + + @staticmethod + def _decode_values(values: bytes) -> Iterable[dict[str, Any]]: + buf = values + + while buf: + try: + (_, _, value_length), new_buf = unpack('!BBH', buf) + value, new_buf = unpack_len(value_length, new_buf) + yield json_loadb(value) + except JSONDecodeError: + (_, value_length), new_buf = unpack('!BB', buf) + value, new_buf = unpack_len(value_length, new_buf) + yield json_loadb(value) + + buf = new_buf + + +class MemoryEventSimulationTester(BaseEventSimulationTester): + def setUp(self) -> None: + super().setUp() + self.builder = self.simulator.get_default_builder() + self._create_artifacts() + + +@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') +class RocksDBEventSimulationTester(BaseEventSimulationTester): + def setUp(self) -> None: + super().setUp() + import tempfile + + directory = tempfile.mkdtemp() + self.tmpdirs.append(directory) + + self.builder = self.simulator.get_default_builder().use_rocksdb(path=directory) + self._create_artifacts() diff --git a/tests/event/test_event_manager.py b/tests/event/test_event_manager.py index 0f602a9fe..e8a09c83b 100644 --- a/tests/event/test_event_manager.py +++ b/tests/event/test_event_manager.py @@ -1,8 +1,5 @@ -from unittest.mock import Mock - from hathor.event.model.event_type import EventType from hathor.event.storage.memory_storage import EventMemoryStorage -from hathor.event.websocket import EventWebsocketFactory from hathor.pubsub import HathorEvents from tests import unittest @@ -16,7 +13,7 @@ def setUp(self): self.event_storage = EventMemoryStorage() self.manager = self.create_peer( self.network, - event_ws_factory=Mock(spec_set=EventWebsocketFactory), + enable_event_queue=True, full_verification=False, event_storage=self.event_storage ) diff --git a/tests/event/test_event_reorg.py b/tests/event/test_event_reorg.py index 878e21b39..d106f47e1 100644 --- a/tests/event/test_event_reorg.py +++ b/tests/event/test_event_reorg.py @@ -1,9 +1,7 @@ -from unittest.mock import Mock from hathor.conf import HathorSettings from hathor.event.model.event_type import EventType from hathor.event.storage import EventMemoryStorage -from hathor.event.websocket import EventWebsocketFactory from tests import unittest from tests.utils import add_new_blocks, get_genesis_key, zip_chunkify @@ -19,7 +17,7 @@ def setUp(self): self.event_storage = EventMemoryStorage() self.manager = self.create_peer( self.network, - event_ws_factory=Mock(spec_set=EventWebsocketFactory), + enable_event_queue=True, full_verification=False, event_storage=self.event_storage ) diff --git a/tests/event/test_event_simulation_responses.py b/tests/event/test_event_simulation_responses.py new file mode 100644 index 000000000..2bc628088 --- /dev/null +++ b/tests/event/test_event_simulation_responses.py @@ -0,0 +1,366 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.event.websocket.request import AckRequest, StartStreamRequest, StopStreamRequest +from hathor.event.websocket.response import InvalidRequestType +from hathor.simulator.trigger import StopAfterNMinedBlocks +from tests.event.event_simulation_tester import ( + BaseEventSimulationTester, + MemoryEventSimulationTester, + RocksDBEventSimulationTester, +) + + +class BaseEventSimulationResponsesTest(BaseEventSimulationTester): + def test_no_start_no_blocks(self) -> None: + self.simulator.run(36000) + + responses = self._get_success_responses() + + assert len(responses) == 0 # no events because not started + + def test_start_no_blocks(self) -> None: + start_stream = StartStreamRequest(type='START_STREAM', window_size=8, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + responses = self._get_success_responses() + + assert len(responses) == 5 # genesis events + assert responses[0].event.id == 0 # no ack, so we get from the first event + + def test_start_no_blocks_with_ack(self) -> None: + start_stream = StartStreamRequest(type='START_STREAM', window_size=8, last_ack_event_id=2) + self._send_request(start_stream) + self.simulator.run(36000) + + responses = self._get_success_responses() + + assert len(responses) == 2 # genesis events 3 and 4 + assert responses[0].event.id == 3 # ack=2, so we get from event 3 + + def test_no_start_with_blocks(self) -> None: + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=100) + self.simulator.run(36000, trigger=trigger) + + responses = self._get_success_responses() + + assert len(responses) == 0 # no events because not started + + def test_start_pre_blocks(self) -> None: + start_stream = StartStreamRequest(type='START_STREAM', window_size=8, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=100) + self.simulator.run(36000, trigger=trigger) + + responses = self._get_success_responses() + + assert len(responses) == 8 # 8 events because of window size + assert responses[0].event.id == 0 # no ack, so we get from the first event + + def test_start_pre_blocks_with_ack(self) -> None: + start_stream = StartStreamRequest(type='START_STREAM', window_size=8, last_ack_event_id=6) + self._send_request(start_stream) + self.simulator.run(36000) + + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=100) + self.simulator.run(36000, trigger=trigger) + + responses = self._get_success_responses() + + assert len(responses) == 8 # 8 events because of window size + assert responses[0].event.id == 7 # ack=6, so we get from event 7 + + def test_start_post_blocks(self) -> None: + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=100) + self.simulator.run(36000, trigger=trigger) + + start_stream = StartStreamRequest(type='START_STREAM', window_size=8, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + responses = self._get_success_responses() + + assert len(responses) == 8 # 8 events because of window size + assert responses[0].event.id == 0 # no ack, so we get from the first event + + def test_start_post_blocks_with_ack(self) -> None: + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=100) + self.simulator.run(36000, trigger=trigger) + + start_stream = StartStreamRequest(type='START_STREAM', window_size=8, last_ack_event_id=48) + self._send_request(start_stream) + self.simulator.run(36000) + + responses = self._get_success_responses() + + assert len(responses) == 8 # 8 events because of window size + assert responses[0].event.id == 49 # ack=48, so we get from event 49 + + def test_restart(self) -> None: + # start the event stream + start_stream = StartStreamRequest(type='START_STREAM', window_size=100, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + # generate 10 blocks + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=10) + self.simulator.run(36000, trigger=trigger) + + # get responses + responses = self._get_success_responses() + + # genesis events (5) + # + VERTEX_METADATA_CHANGED, one for each genesis tx (2) + # + one NEW_VERTEX_ACCEPTED and one VERTEX_METADATA_CHANGED for each new block (2*10) + # there are free slots in window_size + assert len(responses) == 5 + 2 + 2 * 10 # = 27 + assert responses[0].event.id == 0 # no ack, so we get from the first event + + # stop the event stream + stop_stream = StopStreamRequest(type='STOP_STREAM') + self._send_request(stop_stream) + + # generate 10 blocks + trigger.reset() + self.simulator.run(36000, trigger=trigger) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 0 # no events because stream is stopped + + # stop generating blocks + miner.stop() + + # restart event stream + self._send_request(start_stream) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + # events from before (27) + # + one NEW_VERTEX_ACCEPTED and one VERTEX_METADATA_CHANGED for each new block (2*10) + assert len(responses) == 27 + 2 * 10 + assert responses[0].event.id == 0 # no ack, so we get from the first event + + def test_restart_with_ack(self) -> None: + # start the event stream + start_stream = StartStreamRequest(type='START_STREAM', window_size=100, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + # generate 10 blocks + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + miner.start() + + trigger = StopAfterNMinedBlocks(miner, quantity=10) + self.simulator.run(36000, trigger=trigger) + + # get responses + responses = self._get_success_responses() + + # genesis events (5) + # + VERTEX_METADATA_CHANGED, one for each genesis tx (2) + # + one NEW_VERTEX_ACCEPTED and one VERTEX_METADATA_CHANGED for each new block (2*10) + # there are free slots in window_size + assert len(responses) == 5 + 2 + 2 * 10 # = 27 + assert responses[0].event.id == 0 # no ack, so we get from the first event + + # stop the event stream + stop_stream = StopStreamRequest(type='STOP_STREAM') + self._send_request(stop_stream) + + # generate 10 blocks + trigger.reset() + self.simulator.run(36000, trigger=trigger) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 0 # no events because stream is stopped + + # stop generating blocks + miner.stop() + + # restart event stream from last event + start_stream = StartStreamRequest(type='START_STREAM', window_size=100, last_ack_event_id=26) + self._send_request(start_stream) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + # one NEW_VERTEX_ACCEPTED and one VERTEX_METADATA_CHANGED for each new block (2*10) + assert len(responses) == 2 * 10 + assert responses[0].event.id == 27 # ack=26, so we get from event 27 + + def test_restart_with_ack_too_small(self) -> None: + # start the event stream + start_stream = StartStreamRequest(type='START_STREAM', window_size=100, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + # generate 10 blocks + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + + trigger = StopAfterNMinedBlocks(miner, quantity=10) + miner.start() + self.simulator.run(36000, trigger=trigger) + miner.stop() + + # get responses + responses = self._get_success_responses() + + # genesis events (5) + # + VERTEX_METADATA_CHANGED, one for each genesis tx (2) + # + one NEW_VERTEX_ACCEPTED and one VERTEX_METADATA_CHANGED for each new block (2*10) + # there are free slots in window_size + assert len(responses) == 5 + 2 + 2 * 10 # = 27 + assert responses[0].event.id == 0 # no ack, so we get from the first event + + # ack all received events + ack = AckRequest(type='ACK', window_size=100, ack_event_id=26) + self._send_request(ack) + self.simulator.run(36000) + + # stop the event stream + stop_stream = StopStreamRequest(type='STOP_STREAM') + self._send_request(stop_stream) + + # generate 10 blocks + trigger.reset() + miner.start() + self.simulator.run(36000, trigger=trigger) + miner.stop() + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 0 # no events because stream is stopped + + # stop generating blocks + miner.stop() + + # restart event stream from ack too small + start_stream = StartStreamRequest(type='START_STREAM', window_size=100, last_ack_event_id=10) + self._send_request(start_stream) + self.simulator.run(36000) + + # get response + response = self._get_error_response() + + assert response.type == InvalidRequestType.ACK_TOO_SMALL.value + + def test_multiple_interactions(self) -> None: + miner = self.simulator.create_miner(self.manager, hashpower=1e6) + + # generate 10 blocks + trigger = StopAfterNMinedBlocks(miner, quantity=10) + miner.start() + self.simulator.run(36000, trigger=trigger) + miner.stop() + + # start the event stream + start_stream = StartStreamRequest(type='START_STREAM', window_size=1, last_ack_event_id=None) + self._send_request(start_stream) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 1 # 1 event because of window size + assert responses[0].event.id == 0 # no ack, so we get the first event + + # ack event + ack = AckRequest(type='ACK', window_size=1, ack_event_id=0) + self._send_request(ack) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 1 # 1 event because of window size + assert responses[0].event.id == 1 # ack=0, so we get from event 1 + + # increase window size + ack = AckRequest(type='ACK', window_size=4, ack_event_id=1) + self._send_request(ack) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 4 # 4 events because of window size + assert responses[0].event.id == 2 # ack=1, so we get from event 2 + + # same ack + self._send_request(ack) + self.simulator.run(36000) + + # get response + response = self._get_error_response() + + assert response.type == InvalidRequestType.ACK_TOO_SMALL.value # ACK too small because we've already sent it + + # new ack + ack = AckRequest(type='ACK', window_size=4, ack_event_id=5) + self._send_request(ack) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 4 # 4 events because of window size + assert responses[0].event.id == 6 # ack=5, so we get from event 6 + + # if we had failed processing some of the previous events, we wouldn't ACK all of them + ack = AckRequest(type='ACK', window_size=4, ack_event_id=7) + self._send_request(ack) + self.simulator.run(36000) + + # get responses + responses = self._get_success_responses() + + assert len(responses) == 4 # 4 events because of window size + assert responses[0].event.id == 8 # ack=7, so we get from event 8 + + +class MemoryEventSimulationResponsesTest(BaseEventSimulationResponsesTest, MemoryEventSimulationTester): + __test__ = True + + +class RocksDBEventSimulationResponsesTest(BaseEventSimulationResponsesTest, RocksDBEventSimulationTester): + __test__ = True diff --git a/tests/event/test_simulation.py b/tests/event/test_simulation.py index c2e299f9f..14e67bbb4 100644 --- a/tests/event/test_simulation.py +++ b/tests/event/test_simulation.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from unittest.mock import Mock - import pytest from hathor.event.model.base_event import BaseEvent @@ -40,7 +38,7 @@ def test_only_load(self): builder = simulator.get_default_builder() \ .set_peer_id(main_peer_id) \ .disable_full_verification() \ - .enable_event_manager(event_ws_factory=Mock()) + .enable_event_queue() main_manager = simulator.create_peer(builder) main_manager.allow_mining_without_peers() @@ -104,7 +102,7 @@ def test_single_chain_one_block(self): builder = simulator.get_default_builder() \ .set_peer_id(main_peer_id) \ .disable_full_verification() \ - .enable_event_manager(event_ws_factory=Mock()) + .enable_event_queue() main_manager = simulator.create_peer(builder) main_manager.allow_mining_without_peers() @@ -204,7 +202,7 @@ def test_single_chain_blocks_and_transactions(self): builder = simulator.get_default_builder() \ .set_peer_id(main_peer_id) \ .disable_full_verification() \ - .enable_event_manager(event_ws_factory=Mock()) + .enable_event_queue() main_manager = simulator.create_peer(builder) main_manager.allow_mining_without_peers() @@ -305,7 +303,7 @@ def test_reorg(self): builder = simulator.get_default_builder() \ .set_peer_id(main_peer_id) \ .disable_full_verification() \ - .enable_event_manager(event_ws_factory=Mock()) + .enable_event_queue() main_manager = simulator.create_peer(builder) main_manager.allow_mining_without_peers() diff --git a/tests/event/test_tx_metadata.py b/tests/event/test_tx_metadata.py new file mode 100644 index 000000000..cfe47c40d --- /dev/null +++ b/tests/event/test_tx_metadata.py @@ -0,0 +1,94 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.event.model.event_data import SpentOutput, TxMetadata + + +def test_from_spent_output_instance() -> None: + spent_outputs = [ + SpentOutput(index=0, tx_ids=['a', 'b']), + SpentOutput(index=1, tx_ids=['c', 'd']), + ] + metadata = TxMetadata( + hash='some_hash', + spent_outputs=spent_outputs, + conflict_with=[], + voided_by=[], + received_by=[], + children=[], + twins=[], + accumulated_weight=0, + score=0, + first_block=None, + height=0, + validation='some_validation' + ) + + assert metadata.spent_outputs == spent_outputs + + +def test_from_spent_output_list() -> None: + spent_outputs = [ + SpentOutput(index=0, tx_ids=['a', 'b']), + SpentOutput(index=1, tx_ids=['c', 'd']), + ] + metadata = TxMetadata.parse_obj( + dict( + hash='some_hash', + spent_outputs=[ + [0, ['a', 'b']], + [1, ['c', 'd']] + ], + conflict_with=[], + voided_by=[], + received_by=[], + children=[], + twins=[], + accumulated_weight=0, + score=0, + first_block=None, + height=0, + validation='some_validation' + ) + ) + + assert metadata.spent_outputs == spent_outputs + + +def test_from_spent_output_dict() -> None: + spent_outputs = [ + SpentOutput(index=0, tx_ids=['a', 'b']), + SpentOutput(index=1, tx_ids=['c', 'd']), + ] + metadata = TxMetadata.parse_obj( + dict( + hash='some_hash', + spent_outputs=[ + dict(index=0, tx_ids=['a', 'b']), + dict(index=1, tx_ids=['c', 'd']) + ], + conflict_with=[], + voided_by=[], + received_by=[], + children=[], + twins=[], + accumulated_weight=0, + score=0, + first_block=None, + height=0, + validation='some_validation' + ) + ) + + assert metadata.spent_outputs == spent_outputs diff --git a/tests/event/websocket/test_protocol.py b/tests/event/websocket/test_protocol.py index 646344c0d..0ebfeee1e 100644 --- a/tests/event/websocket/test_protocol.py +++ b/tests/event/websocket/test_protocol.py @@ -221,6 +221,10 @@ def test_on_valid_ack_message(ack_event_id, window_size, last_sent_event_id): @pytest.mark.parametrize( ['ack_event_id', 'window_size', 'last_sent_event_id'], [ + (0, 0, None), + (0, 1, None), + (10, 0, None), + (10, 1, None), (0, 0, 0), (0, 1, 10), (0, 10, 1), @@ -274,9 +278,13 @@ def test_start_message_on_active(): ['_ack_event_id', 'last_sent_event_id', 'ack_event_id', '_type'], [ (1, None, 0, InvalidRequestType.ACK_TOO_SMALL), + (1, None, 1, InvalidRequestType.ACK_TOO_SMALL), (1, 1, 0, InvalidRequestType.ACK_TOO_SMALL), + (1, 1, 1, InvalidRequestType.ACK_TOO_SMALL), (10, None, 5, InvalidRequestType.ACK_TOO_SMALL), + (10, None, 10, InvalidRequestType.ACK_TOO_SMALL), (10, 1, 5, InvalidRequestType.ACK_TOO_SMALL), + (10, 1, 10, InvalidRequestType.ACK_TOO_SMALL), (0, None, 1, InvalidRequestType.ACK_TOO_LARGE), (0, 0, 1, InvalidRequestType.ACK_TOO_LARGE), (5, None, 10, InvalidRequestType.ACK_TOO_LARGE), @@ -297,33 +305,26 @@ def test_on_invalid_ack_message(_ack_event_id, last_sent_event_id, ack_event_id, @pytest.mark.parametrize( - ['_ack_event_id', 'last_sent_event_id', 'ack_event_id', '_type'], + ['_ack_event_id', 'ack_event_id'], [ - (0, None, None, InvalidRequestType.ACK_TOO_SMALL), - (0, 1, None, InvalidRequestType.ACK_TOO_SMALL), - (1, None, 0, InvalidRequestType.ACK_TOO_SMALL), - (1, 1, 0, InvalidRequestType.ACK_TOO_SMALL), - (10, None, 5, InvalidRequestType.ACK_TOO_SMALL), - (10, 1, 5, InvalidRequestType.ACK_TOO_SMALL), - (None, None, 0, InvalidRequestType.ACK_TOO_LARGE), - (1, 0, 1, InvalidRequestType.ACK_TOO_LARGE), - (0, None, 1, InvalidRequestType.ACK_TOO_LARGE), - (0, 0, 1, InvalidRequestType.ACK_TOO_LARGE), - (5, None, 10, InvalidRequestType.ACK_TOO_LARGE), - (5, 1, 10, InvalidRequestType.ACK_TOO_LARGE), + (0, None), + (0, None), + (1, 0), + (1, 0), + (10, 5), + (10, 5), ] ) -def test_on_invalid_start_message(_ack_event_id, last_sent_event_id, ack_event_id, _type): +def test_on_invalid_start_message(_ack_event_id, ack_event_id): protocol = EventWebsocketProtocol() protocol._ack_event_id = _ack_event_id - protocol._last_sent_event_id = last_sent_event_id protocol.send_invalid_request_response = Mock() ack_event_id = 'null' if ack_event_id is None else ack_event_id payload = f'{{"type": "START_STREAM", "last_ack_event_id": {ack_event_id}, "window_size": 0}}'.encode('utf8') protocol.onMessage(payload, False) - protocol.send_invalid_request_response.assert_called_once_with(_type, payload) + protocol.send_invalid_request_response.assert_called_once_with(InvalidRequestType.ACK_TOO_SMALL, payload) @pytest.mark.parametrize( diff --git a/tests/unittest.py b/tests/unittest.py index 15c6d6cf9..b6988f7ea 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -177,7 +177,7 @@ def create_peer_from_builder(self, builder, start_manager=True): def create_peer(self, network, peer_id=None, wallet=None, tx_storage=None, unlock_wallet=True, wallet_index=False, capabilities=None, full_verification=True, enable_sync_v1=None, enable_sync_v2=None, checkpoints=None, utxo_index=False, event_manager=None, use_memory_index=None, start_manager=True, - pubsub=None, event_storage=None, event_ws_factory=None, use_memory_storage=None): + pubsub=None, event_storage=None, enable_event_queue=None, use_memory_storage=None): if enable_sync_v1 is None: assert hasattr(self, '_enable_sync_v1'), ('`_enable_sync_v1` has no default by design, either set one on ' 'the test class or pass `enable_sync_v1` by argument') @@ -213,8 +213,8 @@ def create_peer(self, network, peer_id=None, wallet=None, tx_storage=None, unloc if event_manager: builder.set_event_manager(event_manager) - if event_ws_factory: - builder.enable_event_manager(event_ws_factory=event_ws_factory) + if enable_event_queue: + builder.enable_event_queue() if tx_storage is not None: builder.set_tx_storage(tx_storage)