diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index c3815ed27..a5954880e 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -85,6 +85,7 @@ def __init__(self) -> None: self._event_manager: Optional[EventManager] = None self._event_ws_factory: Optional[EventWebsocketFactory] = None + self._enable_event_queue: Optional[bool] = None self._rocksdb_path: Optional[str] = None self._rocksdb_storage: Optional[RocksDBStorage] = None @@ -128,7 +129,6 @@ def build(self) -> BuildArtifacts: consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub) wallet = self._get_or_create_wallet() - event_storage = self._get_or_create_event_storage() event_manager = self._get_or_create_event_manager() tx_storage = self._get_or_create_tx_storage() indexes = tx_storage.indexes @@ -161,20 +161,22 @@ def build(self) -> BuildArtifacts: if self._full_verification is not None: kwargs['full_verification'] = self._full_verification + if self._enable_event_queue is not None: + kwargs['enable_event_queue'] = self._enable_event_queue + manager = HathorManager( reactor, pubsub=pubsub, consensus_algorithm=consensus_algorithm, peer_id=peer_id, tx_storage=tx_storage, - event_storage=event_storage, + event_manager=event_manager, network=self._network, wallet=wallet, rng=self._rng, checkpoints=self._checkpoints, capabilities=self._capabilities, environment_info=get_environment_info(self._cmdline, peer_id.id), - event_manager=event_manager, **kwargs ) @@ -303,8 +305,8 @@ def _get_or_create_event_storage(self) -> EventStorage: return self._event_storage - def _get_or_create_event_manager(self) -> Optional[EventManager]: - if self._event_manager is None and self._event_ws_factory is not None: + def _get_or_create_event_manager(self) -> EventManager: + if self._event_manager is None: self._event_manager = EventManager( reactor=self._get_reactor(), pubsub=self._get_or_create_pubsub(), @@ -389,6 +391,7 @@ def enable_wallet_index(self) -> 'Builder': def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder': self.check_if_can_modify() + self._enable_event_queue = True self._event_ws_factory = event_ws_factory return self diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 7f22dfa42..b6ed4006a 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -136,19 +136,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa pubsub = PubSubManager(reactor) - event_manager: Optional[EventManager] = None if args.x_enable_event_queue: self.event_ws_factory = EventWebsocketFactory(reactor, event_storage) - event_manager = EventManager( - event_storage=event_storage, - event_ws_factory=self.event_ws_factory, - pubsub=pubsub, - reactor=reactor, - emit_load_events=args.x_emit_load_events - ) - else: - self.check_or_raise(not args.x_emit_load_events, '--x-emit-load-events cannot be used without ' - '--x-enable-event-queue') + + event_manager = EventManager( + event_storage=event_storage, + event_ws_factory=self.event_ws_factory, + pubsub=pubsub, + reactor=reactor + ) if args.wallet_index and tx_storage.indexes is not None: self.log.debug('enable wallet indexes') @@ -174,7 +170,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa network=network, hostname=hostname, tx_storage=tx_storage, - event_storage=event_storage, event_manager=event_manager, wallet=self.wallet, stratum_port=args.stratum, @@ -185,7 +180,8 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa enable_sync_v2=enable_sync_v2, consensus_algorithm=consensus_algorithm, environment_info=get_environment_info(args=str(args), peer_id=peer_id.id), - full_verification=full_verification + full_verification=full_verification, + enable_event_queue=bool(args.x_enable_event_queue) ) if args.data: @@ -232,7 +228,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa self.manager.enable_event_queue = True self.log.info('--x-enable-event-queue flag provided. ' - 'The events detected by the full node will be stored and retrieved to clients') + 'The events detected by the full node will be stored and can be retrieved by clients') for description in args.listen: self.manager.add_listen_address(description) diff --git a/hathor/builder/resources_builder.py b/hathor/builder/resources_builder.py index dd9022558..8925c0d5b 100644 --- a/hathor/builder/resources_builder.py +++ b/hathor/builder/resources_builder.py @@ -250,7 +250,7 @@ def create_resources(self, args: Namespace) -> server.Site: ws_factory.subscribe(self.manager.pubsub) # Event websocket resource - if args.x_enable_event_queue and self.event_ws_factory is not None: + if args.x_enable_event_queue: root.putChild(b'event_ws', WebSocketResource(self.event_ws_factory)) root.putChild(b'event', EventResource(self.manager._event_manager)) diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 8bd88e012..809592eb4 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -100,8 +100,6 @@ def create_parser(cls) -> ArgumentParser: parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost') parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS) parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism') - parser.add_argument('--x-emit-load-events', action='store_true', help='Enable emission of events during the ' - 'LOAD phase') parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str, help='Peer IDs to forbid connection') return parser diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 4ab06cbe4..ff3197b29 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Optional +from typing import Callable, Iterator, Optional from structlog import get_logger from hathor.event.model.base_event import BaseEvent from hathor.event.model.event_type import EventType +from hathor.event.model.node_state import NodeState from hathor.event.storage import EventStorage from hathor.event.websocket import EventWebsocketFactory from hathor.pubsub import EventArguments, HathorEvents, PubSubManager +from hathor.transaction import BaseTransaction from hathor.util import Reactor logger = get_logger() @@ -51,7 +53,9 @@ class EventManager: _peer_id: str _is_running: bool = False - _load_finished: bool = False + _previous_node_state: Optional[NodeState] = None + _last_event: Optional[BaseEvent] = None + _last_existing_group_id: Optional[int] = None @property def event_storage(self) -> EventStorage: @@ -60,34 +64,40 @@ def event_storage(self) -> EventStorage: def __init__( self, event_storage: EventStorage, - event_ws_factory: EventWebsocketFactory, pubsub: PubSubManager, reactor: Reactor, - emit_load_events: bool = False + event_ws_factory: Optional[EventWebsocketFactory] = None, ): self.log = logger.new() - self._clock = reactor + self._reactor = reactor self._event_storage = event_storage self._event_ws_factory = event_ws_factory self._pubsub = pubsub - self.emit_load_events = emit_load_events - self._last_event = self._event_storage.get_last_event() - self._last_existing_group_id = self._event_storage.get_last_group_id() + def start(self, peer_id: str) -> None: + assert self._is_running is False, 'Cannot start, EventManager is already running' + assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set' + assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled' + + self._previous_node_state = self._event_storage.get_node_state() + + if self._should_reload_events(): + self._event_storage.clear_events() + else: + self._last_event = self._event_storage.get_last_event() + self._last_existing_group_id = self._event_storage.get_last_group_id() self._assert_closed_event_group() self._subscribe_events() - def start(self, peer_id: str) -> None: - assert self._is_running is False, 'Cannot start, EventManager is already running' - self._peer_id = peer_id self._event_ws_factory.start() self._is_running = True def stop(self): assert self._is_running is True, 'Cannot stop, EventManager is not running' + assert self._event_ws_factory is not None self._event_ws_factory.stop() self._is_running = False @@ -110,25 +120,28 @@ def _subscribe_events(self): """ Subscribe to defined events for the pubsub received """ for event in _SUBSCRIBE_EVENTS: - self._pubsub.subscribe(event, self._handle_event) + self._pubsub.subscribe(event, self._handle_hathor_event) + + def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: + event_type = EventType.from_hathor_event(hathor_event) + + self._handle_event(event_type, event_args) - def _handle_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: + def _handle_event(self, event_type: EventType, event_args: EventArguments) -> None: assert self._is_running, 'Cannot handle event, EventManager is not started.' - event_type = EventType.from_hathor_event(hathor_event) event_specific_handlers = { + EventType.LOAD_STARTED: self._handle_load_started, EventType.LOAD_FINISHED: self._handle_load_finished } if event_specific_handler := event_specific_handlers.get(event_type): event_specific_handler() - if not self._load_finished and not self.emit_load_events: - return - self._handle_event_creation(event_type, event_args) def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> None: + assert self._event_ws_factory is not None create_event_fn: Callable[[EventType, EventArguments], BaseEvent] if event_type in _GROUP_START_EVENTS: @@ -181,9 +194,6 @@ def _create_non_group_edge_event(self, event_type: EventType, event_args: EventA group_id=group_id, ) - def _handle_load_finished(self): - self._load_finished = True - def _create_event( self, event_type: EventType, @@ -193,8 +203,42 @@ def _create_event( return BaseEvent.from_event_arguments( event_id=0 if self._last_event is None else self._last_event.id + 1, peer_id=self._peer_id, - timestamp=self._clock.seconds(), + timestamp=self._reactor.seconds(), event_type=event_type, event_args=event_args, group_id=group_id, ) + + def _handle_load_started(self): + self._event_storage.save_node_state(NodeState.LOAD) + + def _handle_load_finished(self): + self._event_storage.save_node_state(NodeState.SYNC) + + def _should_reload_events(self) -> bool: + return self._previous_node_state in [None, NodeState.LOAD] + + def get_event_queue_state(self) -> bool: + """Get whether the event queue feature is enabled from the storage""" + return self._event_storage.get_event_queue_state() + + def save_event_queue_state(self, state: bool) -> None: + self._event_storage.save_event_queue_state(state) + + def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None: + """ + Either generates load phase events or not, depending on previous node state. + Does so asynchronously so events generated here are not processed before normal event handling. + """ + assert self._is_running, 'Cannot handle load phase events, EventManager is not started.' + + if not self._should_reload_events(): + return + + for vertex in topological_iterator: + self._reactor.callLater( + delay=0, + callable=self._handle_event, + event_type=EventType.NEW_VERTEX_ACCEPTED, + event_args=EventArguments(tx=vertex) + ) diff --git a/hathor/event/model/node_state.py b/hathor/event/model/node_state.py new file mode 100644 index 000000000..161e0061d --- /dev/null +++ b/hathor/event/model/node_state.py @@ -0,0 +1,20 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum + + +class NodeState(Enum): + LOAD = 0 + SYNC = 1 diff --git a/hathor/event/storage/event_storage.py b/hathor/event/storage/event_storage.py index c91b14d79..90a68d761 100644 --- a/hathor/event/storage/event_storage.py +++ b/hathor/event/storage/event_storage.py @@ -16,6 +16,7 @@ from typing import Iterator, Optional from hathor.event.model.base_event import BaseEvent +from hathor.event.model.node_state import NodeState class EventStorage(ABC): @@ -43,3 +44,28 @@ def get_last_group_id(self) -> Optional[int]: def iter_from_event(self, key: int) -> Iterator[BaseEvent]: """ Iterate through events starting from the event with the given key""" raise NotImplementedError + + @abstractmethod + def clear_events(self) -> None: + """Clear all stored events and related metadata.""" + raise NotImplementedError + + @abstractmethod + def save_node_state(self, state: NodeState) -> None: + """Save a node state in the storage""" + raise NotImplementedError + + @abstractmethod + def get_node_state(self) -> Optional[NodeState]: + """Get the node state from the storage""" + raise NotImplementedError + + @abstractmethod + def save_event_queue_state(self, enabled: bool) -> None: + """Save whether the event queue feature is enabled in the storage""" + raise NotImplementedError + + @abstractmethod + def get_event_queue_state(self) -> bool: + """Get whether the event queue feature is enabled from the storage""" + raise NotImplementedError diff --git a/hathor/event/storage/memory_storage.py b/hathor/event/storage/memory_storage.py index 3d28a2035..569d51d0f 100644 --- a/hathor/event/storage/memory_storage.py +++ b/hathor/event/storage/memory_storage.py @@ -15,6 +15,7 @@ from typing import Iterator, List, Optional from hathor.event.model.base_event import BaseEvent +from hathor.event.model.node_state import NodeState from hathor.event.storage.event_storage import EventStorage @@ -23,6 +24,8 @@ def __init__(self) -> None: self._events: List[BaseEvent] = [] self._last_event: Optional[BaseEvent] = None self._last_group_id: Optional[int] = None + self._node_state: Optional[NodeState] = None + self._event_queue_enabled: bool = False def save_event(self, event: BaseEvent) -> None: if event.id != len(self._events): @@ -54,3 +57,20 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]: while key < len(self._events): yield self._events[key] key += 1 + + def clear_events(self) -> None: + self._events = [] + self._last_event = None + self._last_group_id = None + + def save_node_state(self, state: NodeState) -> None: + self._node_state = state + + def get_node_state(self) -> Optional[NodeState]: + return self._node_state + + def save_event_queue_state(self, enabled: bool) -> None: + self._event_queue_enabled = enabled + + def get_event_queue_state(self) -> bool: + return self._event_queue_enabled diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index 5f5f02424..eae5f5305 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -15,21 +15,27 @@ from typing import Iterator, Optional from hathor.event.model.base_event import BaseEvent +from hathor.event.model.node_state import NodeState from hathor.event.storage.event_storage import EventStorage from hathor.storage.rocksdb_storage import RocksDBStorage -from hathor.transaction.util import int_to_bytes +from hathor.transaction.util import bytes_to_int, int_to_bytes from hathor.util import json_dumpb _CF_NAME_EVENT = b'event' _CF_NAME_META = b'event-metadata' _KEY_LAST_GROUP_ID = b'last-group-id' +_KEY_NODE_STATE = b'node-state' +_KEY_EVENT_QUEUE_ENABLED = b'event-queue-enabled' class EventRocksDBStorage(EventStorage): def __init__(self, rocksdb_storage: RocksDBStorage): - self._db = rocksdb_storage.get_db() - self._cf_event = rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) - self._cf_meta = rocksdb_storage.get_or_create_column_family(_CF_NAME_META) + self._rocksdb_storage = rocksdb_storage + + self._db = self._rocksdb_storage.get_db() + self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) + self._cf_meta = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_META) + self._last_event: Optional[BaseEvent] = self._db_get_last_event() self._last_group_id: Optional[int] = self._db_get_last_group_id() @@ -57,11 +63,11 @@ def _db_get_last_group_id(self) -> Optional[int]: last_group_id = self._db.get((self._cf_meta, _KEY_LAST_GROUP_ID)) if last_group_id is None: return None - return int.from_bytes(last_group_id, byteorder='big', signed=False) + return bytes_to_int(last_group_id) def save_event(self, event: BaseEvent) -> None: if (self._last_event is None and event.id != 0) or \ - (self._last_event is not None and event.id > self._last_event.id + 1): + (self._last_event is not None and event.id != self._last_event.id + 1): raise ValueError('invalid event.id, ids must be sequential and leave no gaps') event_data = json_dumpb(event.dict()) key = int_to_bytes(event.id, 8) @@ -84,3 +90,39 @@ def get_last_event(self) -> Optional[BaseEvent]: def get_last_group_id(self) -> Optional[int]: return self._last_group_id + + def clear_events(self) -> None: + self._last_event = None + self._last_group_id = None + + self._db.delete((self._cf_meta, _KEY_LAST_GROUP_ID)) + self._db.drop_column_family(self._cf_event) + + self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) + + def save_node_state(self, state: NodeState) -> None: + self._db.put((self._cf_meta, _KEY_NODE_STATE), int_to_bytes(state.value, 8)) + + def get_node_state(self) -> Optional[NodeState]: + node_state_bytes = self._db.get((self._cf_meta, _KEY_NODE_STATE)) + + if node_state_bytes is None: + return None + + node_state_int = bytes_to_int(node_state_bytes) + + return NodeState(node_state_int) + + def save_event_queue_state(self, enabled: bool) -> None: + self._db.put( + (self._cf_meta, _KEY_EVENT_QUEUE_ENABLED), + enabled.to_bytes(length=1, byteorder='big') + ) + + def get_event_queue_state(self) -> bool: + enabled_bytes = self._db.get((self._cf_meta, _KEY_EVENT_QUEUE_ENABLED)) + + if enabled_bytes is None: + return False + + return bool.from_bytes(enabled_bytes, byteorder='big') diff --git a/hathor/manager.py b/hathor/manager.py index 642c0d775..b608f4386 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -30,7 +30,6 @@ from hathor.conf import HathorSettings from hathor.consensus import ConsensusAlgorithm from hathor.event.event_manager import EventManager -from hathor.event.storage import EventStorage from hathor.exception import ( DoubleSpendingError, HathorError, @@ -88,11 +87,10 @@ def __init__(self, consensus_algorithm: ConsensusAlgorithm, peer_id: PeerId, tx_storage: TransactionStorage, - event_storage: EventStorage, + event_manager: EventManager, network: str, hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, - event_manager: Optional[EventManager] = None, stratum_port: Optional[int] = None, ssl: bool = True, enable_sync_v1: bool = False, @@ -102,7 +100,8 @@ def __init__(self, checkpoints: Optional[List[Checkpoint]] = None, rng: Optional[Random] = None, environment_info: Optional[EnvironmentInfo] = None, - full_verification: bool = False): + full_verification: bool = False, + enable_event_queue: bool = False): """ :param reactor: Twisted reactor which handles the mainloop and the events. :param peer_id: Id of this node. @@ -125,6 +124,11 @@ def __init__(self, if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2): raise TypeError(f'{type(self).__name__}() at least one sync version is required') + if event_manager.get_event_queue_state() is True and not enable_event_queue: + raise ValueError( + 'cannot start manager without event queue feature, as it was enabled in the previous startup' + ) + self._enable_sync_v1 = enable_sync_v1 self._enable_sync_v2 = enable_sync_v2 @@ -172,9 +176,8 @@ def __init__(self, self.tx_storage.pubsub = self.pubsub self._event_manager = event_manager - - if self._event_manager: - assert self._event_manager.event_storage == event_storage + self._event_manager.save_event_queue_state(enable_event_queue) + self._enable_event_queue = enable_event_queue if enable_sync_v2: assert self.tx_storage.indexes is not None @@ -285,7 +288,7 @@ def start(self) -> None: ) sys.exit(-1) - if self._event_manager: + if self._enable_event_queue: self._event_manager.start(not_none(self.my_peer.id)) self.state = self.NodeState.INITIALIZING @@ -355,7 +358,7 @@ def stop(self) -> Deferred: if wait_stratum: waits.append(wait_stratum) - if self._event_manager: + if self._enable_event_queue: self._event_manager.stop() self.tx_storage.flush() @@ -396,7 +399,7 @@ def _initialize_components(self) -> None: This method runs through all transactions, verifying them and updating our wallet. """ - assert not self._event_manager, 'this method cannot be used if the events feature is enabled.' + assert not self._enable_event_queue, 'this method cannot be used if the events feature is enabled.' self.log.info('initialize') if self.wallet: @@ -649,6 +652,11 @@ def _initialize_components_new(self) -> None: # XXX: last step before actually starting is updating the last started at timestamps self.tx_storage.update_last_started_at(started_at) + + if self._enable_event_queue: + topological_iterator = self.tx_storage.topological_iterator() + self._event_manager.handle_load_phase_vertices(topological_iterator) + self.state = self.NodeState.READY self.pubsub.publish(HathorEvents.LOAD_FINISHED) diff --git a/hathor/pubsub.py b/hathor/pubsub.py index 255088f80..0a0e0153e 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -90,9 +90,6 @@ class HathorEvents(Enum): REORG_FINISHED Triggered when consensus algorithm ends all changes involved in a reorg - - VERTEX_METADATA_CHANGED - Triggered when consensus algorithm changes a metadata of an existing vertex (transaction or block) """ MANAGER_ON_START = 'manager:on_start' MANAGER_ON_STOP = 'manager:on_stop' diff --git a/hathor/transaction/util.py b/hathor/transaction/util.py index 0f88ed86c..fb9b1b9c7 100644 --- a/hathor/transaction/util.py +++ b/hathor/transaction/util.py @@ -28,6 +28,19 @@ def int_to_bytes(number: int, size: int, signed: bool = False) -> bytes: return number.to_bytes(size, byteorder='big', signed=signed) +def bytes_to_int(data: bytes, *, signed: bool = False) -> int: + """ + Converts data in bytes to an int. Assumes big-endian format. + + Args: + data: bytes to be converted + signed: whether two's complement is used to represent the integer. + + Returns: the converted data as int + """ + return int.from_bytes(data, byteorder='big', signed=signed) + + def unpack(fmt: str, buf: bytes) -> Any: size = struct.calcsize(fmt) return struct.unpack(fmt, buf[:size]), buf[size:] diff --git a/tests/event/test_event_manager.py b/tests/event/test_event_manager.py index 1b4f7250f..0f602a9fe 100644 --- a/tests/event/test_event_manager.py +++ b/tests/event/test_event_manager.py @@ -42,19 +42,32 @@ def test_event_group(self): self._fake_reorg_started() self._fake_reorg_finished() self.run_to_completion() + event0 = self.event_storage.get_event(0) event1 = self.event_storage.get_event(1) event2 = self.event_storage.get_event(2) event3 = self.event_storage.get_event(3) event4 = self.event_storage.get_event(4) - self.assertEqual(EventType(event0.type), EventType.LOAD_FINISHED) - self.assertEqual(EventType(event1.type), EventType.REORG_STARTED) - self.assertIsNotNone(event1.group_id) - self.assertEqual(EventType(event2.type), EventType.REORG_FINISHED) - self.assertIsNotNone(event2.group_id) - self.assertEqual(event1.group_id, event2.group_id) - self.assertNotEqual(event2.group_id, event3.group_id) - self.assertEqual(event3.group_id, event4.group_id) + event5 = self.event_storage.get_event(5) + event6 = self.event_storage.get_event(6) + event7 = self.event_storage.get_event(7) + event8 = self.event_storage.get_event(8) + + self.assertEqual(EventType(event0.type), EventType.LOAD_STARTED) + self.assertEqual(EventType(event1.type), EventType.NEW_VERTEX_ACCEPTED) + self.assertEqual(EventType(event2.type), EventType.NEW_VERTEX_ACCEPTED) + self.assertEqual(EventType(event3.type), EventType.NEW_VERTEX_ACCEPTED) + self.assertEqual(EventType(event4.type), EventType.LOAD_FINISHED) + self.assertEqual(EventType(event5.type), EventType.REORG_STARTED) + + self.assertIsNotNone(event5.group_id) + self.assertEqual(EventType(event6.type), EventType.REORG_FINISHED) + self.assertIsNotNone(event6.group_id) + self.assertEqual(event5.group_id, event6.group_id) + + self.assertNotEqual(event6.group_id, event7.group_id) + self.assertIsNotNone(event7.group_id) + self.assertEqual(event7.group_id, event8.group_id) def test_cannot_start_group_twice(self): self._fake_reorg_started() diff --git a/tests/event/test_event_reorg.py b/tests/event/test_event_reorg.py index fa69a4fac..878e21b39 100644 --- a/tests/event/test_event_reorg.py +++ b/tests/event/test_event_reorg.py @@ -58,6 +58,10 @@ class unsorted(list): pass expected_events_grouped = [ [ + (EventType.LOAD_STARTED, {}), + (EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_BLOCK_HASH.hex()}), + (EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX1_HASH.hex()}), + (EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX2_HASH.hex()}), (EventType.LOAD_FINISHED, {}) ], # XXX: the order of the following events can vary depending on which genesis is spent/confirmed first diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index f2441f3a5..4889bcb15 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -2,6 +2,8 @@ import pytest +from hathor.event.model.node_state import NodeState +from hathor.event.storage import EventStorage from hathor.event.storage.memory_storage import EventMemoryStorage from hathor.event.storage.rocksdb_storage import EventRocksDBStorage from hathor.storage.rocksdb_storage import RocksDBStorage @@ -12,6 +14,8 @@ class EventStorageBaseTest(unittest.TestCase): __test__ = False + event_storage: EventStorage + def setUp(self): super().setUp() self.event_mocker = EventMocker(self.rng) @@ -92,15 +96,88 @@ def test_iter_from_event_negative_key(self): def test_save_events_and_retrieve_last_group_id(self): expected_group_id = 4 - for i in range(10): - group_id = i if i <= expected_group_id else None - event = self.event_mocker.generate_mocked_event(i, group_id) - self.event_storage.save_event(event) + + self._populate_events_and_last_group_id(n_events=10, last_group_id=expected_group_id) actual_group_id = self.event_storage.get_last_group_id() assert expected_group_id == actual_group_id + def _populate_events_and_last_group_id(self, n_events: int, last_group_id: int) -> None: + for i in range(n_events): + group_id = i if i <= last_group_id else None + event = self.event_mocker.generate_mocked_event(i, group_id) + self.event_storage.save_event(event) + + def test_get_empty_node_state(self): + node_state = self.event_storage.get_node_state() + + assert node_state is None + + def test_save_node_state_and_retrieve(self): + self.event_storage.save_node_state(NodeState.SYNC) + node_state = self.event_storage.get_node_state() + + assert node_state == NodeState.SYNC + + def test_get_empty_event_queue_state(self): + enabled = self.event_storage.get_event_queue_state() + + assert enabled is False + + def test_save_event_queue_enabled_and_retrieve(self): + self.event_storage.save_event_queue_state(True) + enabled = self.event_storage.get_event_queue_state() + + assert enabled is True + + def test_save_event_queue_disabled_and_retrieve(self): + self.event_storage.save_event_queue_state(False) + enabled = self.event_storage.get_event_queue_state() + + assert enabled is False + + def test_clear_events_empty_database(self): + self._test_clear_events() + + def _test_clear_events(self) -> None: + self.event_storage.clear_events() + + events = list(self.event_storage.iter_from_event(0)) + last_event = self.event_storage.get_last_event() + last_group_id = self.event_storage.get_last_group_id() + + assert events == [] + assert last_event is None + assert last_group_id is None + + def test_clear_events_full_database(self): + n_events = 10 + expected_last_group_id = 4 + expected_node_state = NodeState.SYNC + + self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4) + self.event_storage.save_node_state(expected_node_state) + self.event_storage.save_event_queue_state(True) + + events = list(self.event_storage.iter_from_event(0)) + last_group_id = self.event_storage.get_last_group_id() + node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() + + assert len(events) == n_events + assert last_group_id == expected_last_group_id + assert node_state == expected_node_state + assert event_queue_state is True + + self._test_clear_events() + + node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() + + assert node_state == expected_node_state + assert event_queue_state is True + @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') class EventStorageRocksDBTest(EventStorageBaseTest): diff --git a/tests/others/test_cli_builder.py b/tests/others/test_cli_builder.py index eb248003e..4aa4b0e00 100644 --- a/tests/others/test_cli_builder.py +++ b/tests/others/test_cli_builder.py @@ -58,7 +58,7 @@ def test_all_default(self): self.assertNotIn(SyncVersion.V2, manager.connections._sync_factories) self.assertFalse(self.resources_builder._built_prometheus) self.assertFalse(self.resources_builder._built_status) - self.assertIsNone(manager._event_manager) + self.assertFalse(manager._enable_event_queue) @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') def test_cache_storage(self): @@ -163,7 +163,6 @@ def test_event_queue_with_rocksdb_storage(self): self.assertIsInstance(manager._event_manager, EventManager) self.assertIsInstance(manager._event_manager._event_storage, EventRocksDBStorage) self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) - self.assertFalse(manager._event_manager.emit_load_events) def test_event_queue_with_memory_storage(self): manager = self._build(['--x-enable-event-queue', '--memory-storage']) @@ -171,16 +170,7 @@ def test_event_queue_with_memory_storage(self): self.assertIsInstance(manager._event_manager, EventManager) self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage) self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) - self.assertFalse(manager._event_manager.emit_load_events) def test_event_queue_with_full_verification(self): args = ['--x-enable-event-queue', '--memory-storage', '--x-full-verification'] self._build_with_error(args, '--x-full-verification cannot be used with --x-enable-event-queue') - - def test_event_queue_with_emit_load_events(self): - manager = self._build(['--x-enable-event-queue', '--memory-storage', '--x-emit-load-events']) - - self.assertIsInstance(manager._event_manager, EventManager) - self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage) - self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) - self.assertTrue(manager._event_manager.emit_load_events)