diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 59065e330..6cb3fdd36 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -22,6 +22,7 @@ from hathor.conf.settings import HathorSettings as HathorSettingsType from hathor.consensus import ConsensusAlgorithm from hathor.event import EventManager +from hathor.event.model.EventQueueOptions import EventQueueOptions from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage from hathor.event.websocket import EventWebsocketFactory from hathor.indexes import IndexesManager @@ -83,7 +84,7 @@ def __init__(self) -> None: self._event_manager: Optional[EventManager] = None self._event_ws_factory: Optional[EventWebsocketFactory] = None - self._enable_event_queue: Optional[bool] = None + self._event_queue_options: Optional[EventQueueOptions] = None self._rocksdb_path: Optional[str] = None self._rocksdb_storage: Optional[RocksDBStorage] = None @@ -158,8 +159,8 @@ def build(self) -> BuildArtifacts: if self._full_verification is not None: kwargs['full_verification'] = self._full_verification - if self._enable_event_queue is not None: - kwargs['enable_event_queue'] = self._enable_event_queue + if self._event_queue_options is not None: + kwargs['event_queue_options'] = self._event_queue_options manager = HathorManager( reactor, @@ -387,7 +388,7 @@ def enable_wallet_index(self) -> 'Builder': def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder': self.check_if_can_modify() - self._enable_event_queue = True + self._event_queue_options = EventQueueOptions(enable=True) self._event_ws_factory = event_ws_factory return self diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 064f19e2a..2a76a354a 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -28,6 +28,7 @@ from hathor.consensus import ConsensusAlgorithm from hathor.event import EventManager +from hathor.event.model.EventQueueOptions import EventQueueOptions from hathor.event.resources.event import EventResource from hathor.exception import BuilderError from hathor.indexes import IndexesManager @@ -170,6 +171,19 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS) consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub) + if args.x_enable_event_queue: + if not settings.ENABLE_EVENT_QUEUE_FEATURE: + self.log.error('The event queue feature is not available yet') + sys.exit(-1) + + self.log.info('--x-enable-event-queue flag provided. ' + 'The events detected by the full node will be stored and can be retrieved by clients') + + event_queue_options = EventQueueOptions( + enable=bool(args.x_enable_event_queue), + reset=bool(args.x_reset_event_queue) + ) + self.manager = HathorManager( reactor, pubsub=pubsub, @@ -187,7 +201,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa consensus_algorithm=consensus_algorithm, environment_info=get_environment_info(args=str(args), peer_id=peer_id.id), full_verification=full_verification, - enable_event_queue=bool(args.x_enable_event_queue) + event_queue_options=event_queue_options ) if args.allow_mining_without_peers: @@ -224,15 +238,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa if args.memory_indexes and args.memory_storage: self.log.warn('--memory-indexes is implied for memory storage or JSON storage') - if args.x_enable_event_queue: - if not settings.ENABLE_EVENT_QUEUE_FEATURE: - self.log.error('The event queue feature is not available yet') - sys.exit(-1) - - self.manager.enable_event_queue = True - self.log.info('--x-enable-event-queue flag provided. ' - 'The events detected by the full node will be stored and can be retrieved by clients') - for description in args.listen: self.manager.add_listen_address(description) diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 2ccf3f8cc..8632237f7 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -96,6 +96,7 @@ def create_parser(cls) -> ArgumentParser: parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost') parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS) parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism') + parser.add_argument('--x-reset-event-queue', action='store_true', help='Reset the event queue') parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str, help='Peer IDs to forbid connection') return parser diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index ff3197b29..f8a25411a 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -83,7 +83,7 @@ def start(self, peer_id: str) -> None: self._previous_node_state = self._event_storage.get_node_state() if self._should_reload_events(): - self._event_storage.clear_events() + self._event_storage.reset_events() else: self._last_event = self._event_storage.get_last_event() self._last_existing_group_id = self._event_storage.get_last_group_id() @@ -218,6 +218,10 @@ def _handle_load_finished(self): def _should_reload_events(self) -> bool: return self._previous_node_state in [None, NodeState.LOAD] + def reset_all(self) -> None: + """Reset all data.""" + self._event_storage.reset_all() + def get_event_queue_state(self) -> bool: """Get whether the event queue feature is enabled from the storage""" return self._event_storage.get_event_queue_state() diff --git a/hathor/event/model/EventQueueOptions.py b/hathor/event/model/EventQueueOptions.py new file mode 100644 index 000000000..ef0da7b53 --- /dev/null +++ b/hathor/event/model/EventQueueOptions.py @@ -0,0 +1,20 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.utils.pydantic import BaseModel + + +class EventQueueOptions(BaseModel): + enable: bool = False + reset: bool = False diff --git a/hathor/event/model/event_data.py b/hathor/event/model/event_data.py index f794be3a0..f5c341593 100644 --- a/hathor/event/model/event_data.py +++ b/hathor/event/model/event_data.py @@ -88,7 +88,7 @@ def from_event_arguments(cls, args: EventArguments) -> 'EmptyData': class TxData(BaseEventData, extra=Extra.ignore): hash: str - nonce: int + nonce: Optional[int] = None timestamp: int version: int weight: float @@ -100,6 +100,7 @@ class TxData(BaseEventData, extra=Extra.ignore): token_name: Optional[str] token_symbol: Optional[str] metadata: 'TxMetadata' + aux_pow: Optional[str] = None @classmethod def from_event_arguments(cls, args: EventArguments) -> 'TxData': diff --git a/hathor/event/storage/event_storage.py b/hathor/event/storage/event_storage.py index 90a68d761..a24436325 100644 --- a/hathor/event/storage/event_storage.py +++ b/hathor/event/storage/event_storage.py @@ -46,8 +46,13 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]: raise NotImplementedError @abstractmethod - def clear_events(self) -> None: - """Clear all stored events and related metadata.""" + def reset_events(self) -> None: + """Reset stored events and related metadata.""" + raise NotImplementedError + + @abstractmethod + def reset_all(self) -> None: + """Reset all data.""" raise NotImplementedError @abstractmethod diff --git a/hathor/event/storage/memory_storage.py b/hathor/event/storage/memory_storage.py index e3473eec3..19495c0e6 100644 --- a/hathor/event/storage/memory_storage.py +++ b/hathor/event/storage/memory_storage.py @@ -58,11 +58,16 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]: yield self._events[key] key += 1 - def clear_events(self) -> None: + def reset_events(self) -> None: self._events = [] self._last_event = None self._last_group_id = None + def reset_all(self) -> None: + self.reset_events() + self._node_state = None + self._event_queue_enabled = False + def save_node_state(self, state: NodeState) -> None: self._node_state = state diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index eae5f5305..edda8dcd0 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -91,7 +91,7 @@ def get_last_event(self) -> Optional[BaseEvent]: def get_last_group_id(self) -> Optional[int]: return self._last_group_id - def clear_events(self) -> None: + def reset_events(self) -> None: self._last_event = None self._last_group_id = None @@ -100,6 +100,11 @@ def clear_events(self) -> None: self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) + def reset_all(self) -> None: + self.reset_events() + self._db.delete((self._cf_meta, _KEY_NODE_STATE)) + self._db.delete((self._cf_meta, _KEY_EVENT_QUEUE_ENABLED)) + def save_node_state(self, state: NodeState) -> None: self._db.put((self._cf_meta, _KEY_NODE_STATE), int_to_bytes(state.value, 8)) diff --git a/hathor/manager.py b/hathor/manager.py index 916a92bc6..f94c5febd 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -30,6 +30,7 @@ from hathor.conf import HathorSettings from hathor.consensus import ConsensusAlgorithm from hathor.event.event_manager import EventManager +from hathor.event.model.EventQueueOptions import EventQueueOptions from hathor.exception import ( DoubleSpendingError, HathorError, @@ -100,7 +101,7 @@ def __init__(self, rng: Optional[Random] = None, environment_info: Optional[EnvironmentInfo] = None, full_verification: bool = False, - enable_event_queue: bool = False): + event_queue_options: EventQueueOptions = EventQueueOptions()): """ :param reactor: Twisted reactor which handles the mainloop and the events. :param peer_id: Id of this node. @@ -123,7 +124,10 @@ def __init__(self, if not (enable_sync_v1 or enable_sync_v2): raise TypeError(f'{type(self).__name__}() at least one sync version is required') - if event_manager.get_event_queue_state() is True and not enable_event_queue: + if event_queue_options.reset: + event_manager.reset_all() + + if event_manager.get_event_queue_state() is True and not event_queue_options.enable: raise ValueError( 'cannot start manager without event queue feature, as it was enabled in the previous startup' ) @@ -173,8 +177,8 @@ def __init__(self, self.tx_storage.pubsub = self.pubsub self._event_manager = event_manager - self._event_manager.save_event_queue_state(enable_event_queue) - self._enable_event_queue = enable_event_queue + self._event_manager.save_event_queue_state(event_queue_options.enable) + self._event_queue_options = event_queue_options if enable_sync_v2: assert self.tx_storage.indexes is not None @@ -227,10 +231,6 @@ def __init__(self, # Can be activated on the command line with --full-verification self._full_verification = full_verification - # Activated with --x-enable-event-queue flag - # It activates the event mechanism inside full node - self.enable_event_queue = False - # List of whitelisted peers self.peers_whitelist: List[str] = [] @@ -284,7 +284,7 @@ def start(self) -> None: ) sys.exit(-1) - if self._enable_event_queue: + if self._event_queue_options.enable: self._event_manager.start(not_none(self.my_peer.id)) self.state = self.NodeState.INITIALIZING @@ -354,7 +354,7 @@ def stop(self) -> Deferred: if wait_stratum: waits.append(wait_stratum) - if self._enable_event_queue: + if self._event_queue_options.enable: self._event_manager.stop() self.tx_storage.flush() @@ -395,7 +395,7 @@ def _initialize_components(self) -> None: This method runs through all transactions, verifying them and updating our wallet. """ - assert not self._enable_event_queue, 'this method cannot be used if the events feature is enabled.' + assert not self._event_queue_options.enable, 'this method cannot be used if the events feature is enabled.' self.log.info('initialize') if self.wallet: @@ -649,7 +649,7 @@ def _initialize_components_new(self) -> None: # XXX: last step before actually starting is updating the last started at timestamps self.tx_storage.update_last_started_at(started_at) - if self._enable_event_queue: + if self._event_queue_options.enable: topological_iterator = self.tx_storage.topological_iterator() self._event_manager.handle_load_phase_vertices(topological_iterator) diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index 4889bcb15..84427e1d7 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -137,11 +137,14 @@ def test_save_event_queue_disabled_and_retrieve(self): assert enabled is False - def test_clear_events_empty_database(self): - self._test_clear_events() + def test_reset_events_empty_database(self): + self._test_reset_events() - def _test_clear_events(self) -> None: - self.event_storage.clear_events() + def test_reset_all_empty_database(self): + self._test_reset_events() + + def _test_reset_events(self) -> None: + self.event_storage.reset_events() events = list(self.event_storage.iter_from_event(0)) last_event = self.event_storage.get_last_event() @@ -151,7 +154,22 @@ def _test_clear_events(self) -> None: assert last_event is None assert last_group_id is None - def test_clear_events_full_database(self): + def _test_reset_all(self) -> None: + self.event_storage.reset_all() + + events = list(self.event_storage.iter_from_event(0)) + last_event = self.event_storage.get_last_event() + last_group_id = self.event_storage.get_last_group_id() + node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() + + assert events == [] + assert last_event is None + assert last_group_id is None + assert node_state is None + assert event_queue_state is False + + def test_reset_events_full_database(self): n_events = 10 expected_last_group_id = 4 expected_node_state = NodeState.SYNC @@ -170,7 +188,7 @@ def test_clear_events_full_database(self): assert node_state == expected_node_state assert event_queue_state is True - self._test_clear_events() + self._test_reset_events() node_state = self.event_storage.get_node_state() event_queue_state = self.event_storage.get_event_queue_state() @@ -178,6 +196,33 @@ def test_clear_events_full_database(self): assert node_state == expected_node_state assert event_queue_state is True + def test_reset_all_full_database(self): + n_events = 10 + expected_last_group_id = 4 + expected_node_state = NodeState.SYNC + + self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4) + self.event_storage.save_node_state(expected_node_state) + self.event_storage.save_event_queue_state(True) + + events = list(self.event_storage.iter_from_event(0)) + last_group_id = self.event_storage.get_last_group_id() + node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() + + assert len(events) == n_events + assert last_group_id == expected_last_group_id + assert node_state == expected_node_state + assert event_queue_state is True + + self._test_reset_all() + + node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() + + assert node_state is None + assert event_queue_state is False + @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') class EventStorageRocksDBTest(EventStorageBaseTest): diff --git a/tests/others/test_cli_builder.py b/tests/others/test_cli_builder.py index a4cbc09bb..91bc19c6e 100644 --- a/tests/others/test_cli_builder.py +++ b/tests/others/test_cli_builder.py @@ -4,6 +4,7 @@ from hathor.builder import CliBuilder from hathor.event import EventManager +from hathor.event.model.EventQueueOptions import EventQueueOptions from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage from hathor.event.websocket import EventWebsocketFactory from hathor.exception import BuilderError @@ -55,7 +56,7 @@ def test_all_default(self): self.assertNotIn(SyncVersion.V2, manager.connections._sync_factories) self.assertFalse(self.builder._build_prometheus) self.assertFalse(self.builder._build_status) - self.assertFalse(manager._enable_event_queue) + self.assertEqual(EventQueueOptions(enable=False, reset=False), manager._event_queue_options) @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') def test_cache_storage(self): @@ -152,6 +153,7 @@ def test_event_queue_with_rocksdb_storage(self): self.assertIsInstance(manager._event_manager, EventManager) self.assertIsInstance(manager._event_manager._event_storage, EventRocksDBStorage) self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) + self.assertEqual(EventQueueOptions(enable=True, reset=False), manager._event_queue_options) def test_event_queue_with_memory_storage(self): manager = self._build(['--x-enable-event-queue', '--memory-storage']) @@ -159,7 +161,16 @@ def test_event_queue_with_memory_storage(self): self.assertIsInstance(manager._event_manager, EventManager) self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage) self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) + self.assertEqual(EventQueueOptions(enable=True, reset=False), manager._event_queue_options) def test_event_queue_with_full_verification(self): args = ['--x-enable-event-queue', '--memory-storage', '--x-full-verification'] self._build_with_error(args, '--x-full-verification cannot be used with --x-enable-event-queue') + + def test_reset_event_queue(self): + manager = self._build(['--x-reset-event-queue', '--memory-storage']) + self.assertEqual(EventQueueOptions(enable=False, reset=True), manager._event_queue_options) + + def test_reset_and_enable_event_queue(self): + manager = self._build(['--x-reset-event-queue', '--x-enable-event-queue', '--memory-storage']) + self.assertEqual(EventQueueOptions(enable=True, reset=True), manager._event_queue_options)