From a83773d90bdf7a84264ee191f8efdaf25ebd1097 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 13 Apr 2023 00:20:15 -0300 Subject: [PATCH 01/21] feat(events): remove --x-emit-load-events CLI flag --- hathor/builder/cli_builder.py | 8 ++------ hathor/cli/run_node.py | 2 -- hathor/event/event_manager.py | 9 ++++----- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 7f22dfa42..4eebe317c 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -143,12 +143,8 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa event_storage=event_storage, event_ws_factory=self.event_ws_factory, pubsub=pubsub, - reactor=reactor, - emit_load_events=args.x_emit_load_events + reactor=reactor ) - else: - self.check_or_raise(not args.x_emit_load_events, '--x-emit-load-events cannot be used without ' - '--x-enable-event-queue') if args.wallet_index and tx_storage.indexes is not None: self.log.debug('enable wallet indexes') @@ -232,7 +228,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa self.manager.enable_event_queue = True self.log.info('--x-enable-event-queue flag provided. ' - 'The events detected by the full node will be stored and retrieved to clients') + 'The events detected by the full node will be stored and can be retrieved by clients') for description in args.listen: self.manager.add_listen_address(description) diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 8bd88e012..809592eb4 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -100,8 +100,6 @@ def create_parser(cls) -> ArgumentParser: parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost') parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS) parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism') - parser.add_argument('--x-emit-load-events', action='store_true', help='Enable emission of events during the ' - 'LOAD phase') parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str, help='Peer IDs to forbid connection') return parser diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 4ab06cbe4..a901bda74 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -62,8 +62,7 @@ def __init__( event_storage: EventStorage, event_ws_factory: EventWebsocketFactory, pubsub: PubSubManager, - reactor: Reactor, - emit_load_events: bool = False + reactor: Reactor ): self.log = logger.new() @@ -71,7 +70,6 @@ def __init__( self._event_storage = event_storage self._event_ws_factory = event_ws_factory self._pubsub = pubsub - self.emit_load_events = emit_load_events self._last_event = self._event_storage.get_last_event() self._last_existing_group_id = self._event_storage.get_last_group_id() @@ -123,8 +121,9 @@ def _handle_event(self, hathor_event: HathorEvents, event_args: EventArguments) if event_specific_handler := event_specific_handlers.get(event_type): event_specific_handler() - if not self._load_finished and not self.emit_load_events: - return + # TODO: Are there any events being emitted during the load phase that are not emitted from this class? + # if not self._load_finished: + # return self._handle_event_creation(event_type, event_args) From 2546b882df13fc3d89199cae664c1a5d0a8fc471 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 13 Apr 2023 00:22:56 -0300 Subject: [PATCH 02/21] feat(events): implement node state related methods on EventStorage --- hathor/event/model/node_state.py | 20 ++++++++++++++++++++ hathor/event/storage/event_storage.py | 16 ++++++++++++++++ hathor/event/storage/memory_storage.py | 13 +++++++++++++ hathor/event/storage/rocksdb_storage.py | 21 ++++++++++++++++++--- hathor/transaction/util.py | 4 ++++ tests/event/test_event_storage.py | 1 + 6 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 hathor/event/model/node_state.py diff --git a/hathor/event/model/node_state.py b/hathor/event/model/node_state.py new file mode 100644 index 000000000..b785ddd3a --- /dev/null +++ b/hathor/event/model/node_state.py @@ -0,0 +1,20 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum, auto + + +class NodeState(Enum): + LOAD = auto() + SYNC = auto() diff --git a/hathor/event/storage/event_storage.py b/hathor/event/storage/event_storage.py index c91b14d79..2f963559b 100644 --- a/hathor/event/storage/event_storage.py +++ b/hathor/event/storage/event_storage.py @@ -16,6 +16,7 @@ from typing import Iterator, Optional from hathor.event.model.base_event import BaseEvent +from hathor.event.model.node_state import NodeState class EventStorage(ABC): @@ -43,3 +44,18 @@ def get_last_group_id(self) -> Optional[int]: def iter_from_event(self, key: int) -> Iterator[BaseEvent]: """ Iterate through events starting from the event with the given key""" raise NotImplementedError + + @abstractmethod + def clear_events(self) -> None: + """Clear all stored events and related metadata.""" + raise NotImplementedError + + @abstractmethod + def save_node_state(self, state: NodeState) -> None: + """Save a node state in the storage""" + raise NotImplementedError + + @abstractmethod + def get_node_state(self) -> Optional[NodeState]: + """Get the node state from the storage""" + raise NotImplementedError diff --git a/hathor/event/storage/memory_storage.py b/hathor/event/storage/memory_storage.py index 3d28a2035..05843a727 100644 --- a/hathor/event/storage/memory_storage.py +++ b/hathor/event/storage/memory_storage.py @@ -15,6 +15,7 @@ from typing import Iterator, List, Optional from hathor.event.model.base_event import BaseEvent +from hathor.event.model.node_state import NodeState from hathor.event.storage.event_storage import EventStorage @@ -23,6 +24,7 @@ def __init__(self) -> None: self._events: List[BaseEvent] = [] self._last_event: Optional[BaseEvent] = None self._last_group_id: Optional[int] = None + self._node_state: Optional[NodeState] = None def save_event(self, event: BaseEvent) -> None: if event.id != len(self._events): @@ -54,3 +56,14 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]: while key < len(self._events): yield self._events[key] key += 1 + + def clear_events(self) -> None: + self._events = [] + self._last_event = None + self._last_group_id = None + + def save_node_state(self, state: NodeState) -> None: + self._node_state = state + + def get_node_state(self) -> Optional[NodeState]: + return self._node_state diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index 5f5f02424..68278fb3c 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -15,14 +15,16 @@ from typing import Iterator, Optional from hathor.event.model.base_event import BaseEvent +from hathor.event.model.node_state import NodeState from hathor.event.storage.event_storage import EventStorage from hathor.storage.rocksdb_storage import RocksDBStorage -from hathor.transaction.util import int_to_bytes +from hathor.transaction.util import bytes_to_int, int_to_bytes from hathor.util import json_dumpb _CF_NAME_EVENT = b'event' _CF_NAME_META = b'event-metadata' _KEY_LAST_GROUP_ID = b'last-group-id' +_KEY_NODE_STATE = b'node-state' class EventRocksDBStorage(EventStorage): @@ -57,11 +59,11 @@ def _db_get_last_group_id(self) -> Optional[int]: last_group_id = self._db.get((self._cf_meta, _KEY_LAST_GROUP_ID)) if last_group_id is None: return None - return int.from_bytes(last_group_id, byteorder='big', signed=False) + return bytes_to_int(last_group_id) def save_event(self, event: BaseEvent) -> None: if (self._last_event is None and event.id != 0) or \ - (self._last_event is not None and event.id > self._last_event.id + 1): + (self._last_event is not None and event.id != self._last_event.id + 1): raise ValueError('invalid event.id, ids must be sequential and leave no gaps') event_data = json_dumpb(event.dict()) key = int_to_bytes(event.id, 8) @@ -84,3 +86,16 @@ def get_last_event(self) -> Optional[BaseEvent]: def get_last_group_id(self) -> Optional[int]: return self._last_group_id + + def clear_events(self) -> None: + self._db.drop_column_family(self._cf_event) + self._db.delete((self._cf_meta, _KEY_LAST_GROUP_ID)) + + def save_node_state(self, state: NodeState) -> None: + self._db.put((self._cf_meta, _KEY_NODE_STATE), int_to_bytes(state.value, 8)) + + def get_node_state(self) -> Optional[NodeState]: + node_state_bytes = self._db.get((self._cf_meta, _KEY_NODE_STATE)) + node_state_int = bytes_to_int(node_state_bytes) + + return NodeState(node_state_int) diff --git a/hathor/transaction/util.py b/hathor/transaction/util.py index 0f88ed86c..0ce21a95b 100644 --- a/hathor/transaction/util.py +++ b/hathor/transaction/util.py @@ -28,6 +28,10 @@ def int_to_bytes(number: int, size: int, signed: bool = False) -> bytes: return number.to_bytes(size, byteorder='big', signed=signed) +def bytes_to_int(data: bytes, signed: bool = False) -> int: + return int.from_bytes(data, byteorder='big', signed=signed) + + def unpack(fmt: str, buf: bytes) -> Any: size = struct.calcsize(fmt) return struct.unpack(fmt, buf[:size]), buf[size:] diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index f2441f3a5..3cd9fb45a 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -9,6 +9,7 @@ from tests.utils import HAS_ROCKSDB, EventMocker +# TODO: Implement tests for new methods class EventStorageBaseTest(unittest.TestCase): __test__ = False From a2d046e037ca5ed454ce8f5c13dcffdfc754bb34 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 13 Apr 2023 00:24:24 -0300 Subject: [PATCH 03/21] feat(events): implement load phase related methods on EventManager --- hathor/event/event_manager.py | 44 ++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index a901bda74..0de36818b 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Optional +from typing import Callable, Iterator, Optional from structlog import get_logger from hathor.event.model.base_event import BaseEvent from hathor.event.model.event_type import EventType +from hathor.event.model.node_state import NodeState from hathor.event.storage import EventStorage from hathor.event.websocket import EventWebsocketFactory from hathor.pubsub import EventArguments, HathorEvents, PubSubManager +from hathor.transaction import BaseTransaction from hathor.util import Reactor logger = get_logger() @@ -51,7 +53,7 @@ class EventManager: _peer_id: str _is_running: bool = False - _load_finished: bool = False + _previous_node_state: Optional[NodeState] = None @property def event_storage(self) -> EventStorage: @@ -80,6 +82,11 @@ def __init__( def start(self, peer_id: str) -> None: assert self._is_running is False, 'Cannot start, EventManager is already running' + self._previous_node_state = self._event_storage.get_node_state() + + if self._should_reload_events(): + self._event_storage.clear_events() + self._peer_id = peer_id self._event_ws_factory.start() self._is_running = True @@ -108,13 +115,18 @@ def _subscribe_events(self): """ Subscribe to defined events for the pubsub received """ for event in _SUBSCRIBE_EVENTS: - self._pubsub.subscribe(event, self._handle_event) + self._pubsub.subscribe(event, self._handle_hathor_event) + + def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: + event_type = EventType.from_hathor_event(hathor_event) - def _handle_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: + self._handle_event(event_type, event_args) + + def _handle_event(self, event_type: EventType, event_args: EventArguments) -> None: assert self._is_running, 'Cannot handle event, EventManager is not started.' - event_type = EventType.from_hathor_event(hathor_event) event_specific_handlers = { + EventType.LOAD_STARTED: self._handle_load_started, EventType.LOAD_FINISHED: self._handle_load_finished } @@ -180,9 +192,6 @@ def _create_non_group_edge_event(self, event_type: EventType, event_args: EventA group_id=group_id, ) - def _handle_load_finished(self): - self._load_finished = True - def _create_event( self, event_type: EventType, @@ -197,3 +206,22 @@ def _create_event( event_args=event_args, group_id=group_id, ) + + def _handle_load_started(self): + self._event_storage.save_node_state(NodeState.LOAD) + + def _handle_load_finished(self): + self._event_storage.save_node_state(NodeState.SYNC) + + def _should_reload_events(self) -> bool: + return self._previous_node_state in [None, NodeState.LOAD] + + def handle_load_phase_events(self, topological_iterator: Iterator[BaseTransaction]) -> None: + """Either generates load phase events or not, depending on previous node state.""" + + if not self._should_reload_events(): + return + + for vertex in topological_iterator: + args = EventArguments(tx=vertex) + self._handle_event(EventType.NEW_VERTEX_ACCEPTED, args) From ec1eea6369adcb82b64e6936332ad22c9c30ca1b Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 13 Apr 2023 00:25:15 -0300 Subject: [PATCH 04/21] feat(events): implement load phase event handling on HathorManager --- hathor/manager.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hathor/manager.py b/hathor/manager.py index 642c0d775..65b8f0774 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -649,6 +649,12 @@ def _initialize_components_new(self) -> None: # XXX: last step before actually starting is updating the last started at timestamps self.tx_storage.update_last_started_at(started_at) + + # TODO: Can be moved to the outer method, allowing full verification? I think so, but publish(HathorEvents.LOAD_FINISHED) also has to be moved + if self._event_manager: + topological_iterator = self.tx_storage.topological_iterator() + self._event_manager.handle_load_phase_events(topological_iterator) + self.state = self.NodeState.READY self.pubsub.publish(HathorEvents.LOAD_FINISHED) From 6ae1b3bdcaf69ff201229e222304227a18f6e296 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 13 Apr 2023 00:33:58 -0300 Subject: [PATCH 05/21] feat(events): update tests --- hathor/manager.py | 3 ++- tests/event/test_event_manager.py | 31 +++++++++++++++++++++++-------- tests/event/test_event_reorg.py | 6 ++++++ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/hathor/manager.py b/hathor/manager.py index 65b8f0774..b2a2380b0 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -650,7 +650,8 @@ def _initialize_components_new(self) -> None: # XXX: last step before actually starting is updating the last started at timestamps self.tx_storage.update_last_started_at(started_at) - # TODO: Can be moved to the outer method, allowing full verification? I think so, but publish(HathorEvents.LOAD_FINISHED) also has to be moved + # TODO: Can be moved to the outer method, allowing full verification? I think so, but publish( + # HathorEvents.LOAD_FINISHED) also has to be moved if self._event_manager: topological_iterator = self.tx_storage.topological_iterator() self._event_manager.handle_load_phase_events(topological_iterator) diff --git a/tests/event/test_event_manager.py b/tests/event/test_event_manager.py index 1b4f7250f..051f48aeb 100644 --- a/tests/event/test_event_manager.py +++ b/tests/event/test_event_manager.py @@ -7,6 +7,8 @@ from tests import unittest +# TODO: This test fails with Clock because of pubsub event ordering caused by +# ReactorThread.get_current_thread. Changing MAIN_THREAD to NOT_RUNNING makes it work class BaseEventManagerTest(unittest.TestCase): __test__ = False @@ -42,19 +44,32 @@ def test_event_group(self): self._fake_reorg_started() self._fake_reorg_finished() self.run_to_completion() + event0 = self.event_storage.get_event(0) event1 = self.event_storage.get_event(1) event2 = self.event_storage.get_event(2) event3 = self.event_storage.get_event(3) event4 = self.event_storage.get_event(4) - self.assertEqual(EventType(event0.type), EventType.LOAD_FINISHED) - self.assertEqual(EventType(event1.type), EventType.REORG_STARTED) - self.assertIsNotNone(event1.group_id) - self.assertEqual(EventType(event2.type), EventType.REORG_FINISHED) - self.assertIsNotNone(event2.group_id) - self.assertEqual(event1.group_id, event2.group_id) - self.assertNotEqual(event2.group_id, event3.group_id) - self.assertEqual(event3.group_id, event4.group_id) + event5 = self.event_storage.get_event(5) + event6 = self.event_storage.get_event(6) + event7 = self.event_storage.get_event(7) + event8 = self.event_storage.get_event(8) + + self.assertEqual(EventType(event0.type), EventType.LOAD_STARTED) + self.assertEqual(EventType(event1.type), EventType.NEW_VERTEX_ACCEPTED) + self.assertEqual(EventType(event2.type), EventType.NEW_VERTEX_ACCEPTED) + self.assertEqual(EventType(event3.type), EventType.NEW_VERTEX_ACCEPTED) + self.assertEqual(EventType(event4.type), EventType.LOAD_FINISHED) + self.assertEqual(EventType(event5.type), EventType.REORG_STARTED) + + self.assertIsNotNone(event5.group_id) + self.assertEqual(EventType(event6.type), EventType.REORG_FINISHED) + self.assertIsNotNone(event6.group_id) + self.assertEqual(event5.group_id, event6.group_id) + + self.assertNotEqual(event6.group_id, event7.group_id) + self.assertIsNotNone(event7.group_id) + self.assertEqual(event7.group_id, event8.group_id) def test_cannot_start_group_twice(self): self._fake_reorg_started() diff --git a/tests/event/test_event_reorg.py b/tests/event/test_event_reorg.py index fa69a4fac..f69bc36ea 100644 --- a/tests/event/test_event_reorg.py +++ b/tests/event/test_event_reorg.py @@ -10,6 +10,8 @@ settings = HathorSettings() +# TODO: This test fails with Clock because of pubsub event ordering caused by +# ReactorThread.get_current_thread. Changing MAIN_THREAD to NOT_RUNNING makes it work class BaseEventReorgTest(unittest.TestCase): __test__ = False @@ -58,6 +60,10 @@ class unsorted(list): pass expected_events_grouped = [ [ + (EventType.LOAD_STARTED, {}), + (EventType.NEW_VERTEX_ACCEPTED, {}), + (EventType.NEW_VERTEX_ACCEPTED, {}), + (EventType.NEW_VERTEX_ACCEPTED, {}), (EventType.LOAD_FINISHED, {}) ], # XXX: the order of the following events can vary depending on which genesis is spent/confirmed first From b8b3245d21a4e16690e3ae7b902820437749fb0f Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Fri, 14 Apr 2023 16:48:45 -0300 Subject: [PATCH 06/21] feat(events): implement event storage tests for new methods --- tests/event/test_event_storage.py | 58 ++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index 3cd9fb45a..782bcf260 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -1,7 +1,9 @@ import tempfile +from typing import Optional import pytest +from hathor.event.model.node_state import NodeState from hathor.event.storage.memory_storage import EventMemoryStorage from hathor.event.storage.rocksdb_storage import EventRocksDBStorage from hathor.storage.rocksdb_storage import RocksDBStorage @@ -9,7 +11,6 @@ from tests.utils import HAS_ROCKSDB, EventMocker -# TODO: Implement tests for new methods class EventStorageBaseTest(unittest.TestCase): __test__ = False @@ -93,15 +94,62 @@ def test_iter_from_event_negative_key(self): def test_save_events_and_retrieve_last_group_id(self): expected_group_id = 4 - for i in range(10): - group_id = i if i <= expected_group_id else None - event = self.event_mocker.generate_mocked_event(i, group_id) - self.event_storage.save_event(event) + + self._populate_events_and_last_group_id(n_events=10, last_group_id=expected_group_id) actual_group_id = self.event_storage.get_last_group_id() assert expected_group_id == actual_group_id + def _populate_events_and_last_group_id(self, n_events: int, last_group_id: int) -> None: + for i in range(n_events): + group_id = i if i <= last_group_id else None + event = self.event_mocker.generate_mocked_event(i, group_id) + self.event_storage.save_event(event) + + def test_get_empty_node_state(self): + node_state = self.event_storage.get_node_state() + + assert node_state is None + + def test_save_node_state_and_retrieve(self): + self.event_storage.save_node_state(NodeState.SYNC) + node_state = self.event_storage.get_node_state() + + assert node_state == NodeState.SYNC + + def test_clear_events_empty_database(self): + self._test_clear_events() + + def _test_clear_events(self, expected_node_state: Optional[NodeState] = None): + self.event_storage.clear_events() + + events = list(self.event_storage.iter_from_event(0)) + last_group_id = self.event_storage.get_last_group_id() + node_state = self.event_storage.get_node_state() + + assert events == [] + assert last_group_id is None + assert node_state == expected_node_state + + def test_clear_events_full_database(self): + n_events = 10 + expected_last_group_id = 4 + expected_node_state = NodeState.SYNC + + self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4) + self.event_storage.save_node_state(expected_node_state) + + events = list(self.event_storage.iter_from_event(0)) + last_group_id = self.event_storage.get_last_group_id() + node_state = self.event_storage.get_node_state() + + assert len(events) == n_events + assert last_group_id == expected_last_group_id + assert node_state == expected_node_state + + self._test_clear_events(expected_node_state) + @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') class EventStorageRocksDBTest(EventStorageBaseTest): From 2a779574086234d3f131b785c4fe700819bded2b Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Fri, 14 Apr 2023 16:49:24 -0300 Subject: [PATCH 07/21] feat(events): fix rocksdb event storage bugs --- hathor/event/storage/rocksdb_storage.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index 68278fb3c..eb1de2bd0 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -29,9 +29,12 @@ class EventRocksDBStorage(EventStorage): def __init__(self, rocksdb_storage: RocksDBStorage): - self._db = rocksdb_storage.get_db() - self._cf_event = rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) - self._cf_meta = rocksdb_storage.get_or_create_column_family(_CF_NAME_META) + self._rocksdb_storage = rocksdb_storage + + self._db = self._rocksdb_storage.get_db() + self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) + self._cf_meta = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_META) + self._last_event: Optional[BaseEvent] = self._db_get_last_event() self._last_group_id: Optional[int] = self._db_get_last_group_id() @@ -88,14 +91,23 @@ def get_last_group_id(self) -> Optional[int]: return self._last_group_id def clear_events(self) -> None: - self._db.drop_column_family(self._cf_event) + self._last_event = None + self._last_group_id = None + self._db.delete((self._cf_meta, _KEY_LAST_GROUP_ID)) + self._db.drop_column_family(self._cf_event) + + self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT) def save_node_state(self, state: NodeState) -> None: self._db.put((self._cf_meta, _KEY_NODE_STATE), int_to_bytes(state.value, 8)) def get_node_state(self) -> Optional[NodeState]: node_state_bytes = self._db.get((self._cf_meta, _KEY_NODE_STATE)) + + if node_state_bytes is None: + return None + node_state_int = bytes_to_int(node_state_bytes) return NodeState(node_state_int) From 8fb4f046fb35a1d7782899838a61303aad703296 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Fri, 14 Apr 2023 17:51:11 -0300 Subject: [PATCH 08/21] feat(events): fix bug on load phase events --- hathor/event/event_manager.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 0de36818b..aeec4bf2b 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -54,6 +54,8 @@ class EventManager: _peer_id: str _is_running: bool = False _previous_node_state: Optional[NodeState] = None + _last_event: Optional[BaseEvent] = None + _last_existing_group_id: Optional[int] = None @property def event_storage(self) -> EventStorage: @@ -68,14 +70,11 @@ def __init__( ): self.log = logger.new() - self._clock = reactor + self._reactor = reactor self._event_storage = event_storage self._event_ws_factory = event_ws_factory self._pubsub = pubsub - self._last_event = self._event_storage.get_last_event() - self._last_existing_group_id = self._event_storage.get_last_group_id() - self._assert_closed_event_group() self._subscribe_events() @@ -86,6 +85,9 @@ def start(self, peer_id: str) -> None: if self._should_reload_events(): self._event_storage.clear_events() + else: + self._last_event = self._event_storage.get_last_event() + self._last_existing_group_id = self._event_storage.get_last_group_id() self._peer_id = peer_id self._event_ws_factory.start() @@ -201,7 +203,7 @@ def _create_event( return BaseEvent.from_event_arguments( event_id=0 if self._last_event is None else self._last_event.id + 1, peer_id=self._peer_id, - timestamp=self._clock.seconds(), + timestamp=self._reactor.seconds(), event_type=event_type, event_args=event_args, group_id=group_id, @@ -217,11 +219,19 @@ def _should_reload_events(self) -> bool: return self._previous_node_state in [None, NodeState.LOAD] def handle_load_phase_events(self, topological_iterator: Iterator[BaseTransaction]) -> None: - """Either generates load phase events or not, depending on previous node state.""" + """ + Either generates load phase events or not, depending on previous node state. + Does so asynchronously so events generated here are not processed before normal event handling. + """ + assert self._is_running, 'Cannot handle load phase events, EventManager is not started.' if not self._should_reload_events(): return for vertex in topological_iterator: args = EventArguments(tx=vertex) - self._handle_event(EventType.NEW_VERTEX_ACCEPTED, args) + + self._reactor.callLater( + delay=0, + callable=lambda: self._handle_event(EventType.NEW_VERTEX_ACCEPTED, args) + ) From 71bc1bd54d10c5504d77d2963a7f6f0e27774f34 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 17 Apr 2023 11:55:04 -0300 Subject: [PATCH 09/21] feat(events): remove TODOs --- hathor/event/event_manager.py | 4 ---- hathor/manager.py | 2 -- 2 files changed, 6 deletions(-) diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index aeec4bf2b..7b65f98f5 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -135,10 +135,6 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No if event_specific_handler := event_specific_handlers.get(event_type): event_specific_handler() - # TODO: Are there any events being emitted during the load phase that are not emitted from this class? - # if not self._load_finished: - # return - self._handle_event_creation(event_type, event_args) def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> None: diff --git a/hathor/manager.py b/hathor/manager.py index b2a2380b0..901d12ca0 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -650,8 +650,6 @@ def _initialize_components_new(self) -> None: # XXX: last step before actually starting is updating the last started at timestamps self.tx_storage.update_last_started_at(started_at) - # TODO: Can be moved to the outer method, allowing full verification? I think so, but publish( - # HathorEvents.LOAD_FINISHED) also has to be moved if self._event_manager: topological_iterator = self.tx_storage.topological_iterator() self._event_manager.handle_load_phase_events(topological_iterator) From b9914c9187873683b88705694bb8f6b37e9a94df Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 17 Apr 2023 12:03:04 -0300 Subject: [PATCH 10/21] feat(events): lint code and improve method name --- hathor/event/event_manager.py | 2 +- hathor/manager.py | 2 +- tests/event/test_event_manager.py | 2 -- tests/event/test_event_reorg.py | 2 -- tests/event/test_event_storage.py | 5 ++++- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 7b65f98f5..e43ddd443 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -214,7 +214,7 @@ def _handle_load_finished(self): def _should_reload_events(self) -> bool: return self._previous_node_state in [None, NodeState.LOAD] - def handle_load_phase_events(self, topological_iterator: Iterator[BaseTransaction]) -> None: + def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None: """ Either generates load phase events or not, depending on previous node state. Does so asynchronously so events generated here are not processed before normal event handling. diff --git a/hathor/manager.py b/hathor/manager.py index 901d12ca0..7762b659a 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -652,7 +652,7 @@ def _initialize_components_new(self) -> None: if self._event_manager: topological_iterator = self.tx_storage.topological_iterator() - self._event_manager.handle_load_phase_events(topological_iterator) + self._event_manager.handle_load_phase_vertices(topological_iterator) self.state = self.NodeState.READY self.pubsub.publish(HathorEvents.LOAD_FINISHED) diff --git a/tests/event/test_event_manager.py b/tests/event/test_event_manager.py index 051f48aeb..0f602a9fe 100644 --- a/tests/event/test_event_manager.py +++ b/tests/event/test_event_manager.py @@ -7,8 +7,6 @@ from tests import unittest -# TODO: This test fails with Clock because of pubsub event ordering caused by -# ReactorThread.get_current_thread. Changing MAIN_THREAD to NOT_RUNNING makes it work class BaseEventManagerTest(unittest.TestCase): __test__ = False diff --git a/tests/event/test_event_reorg.py b/tests/event/test_event_reorg.py index f69bc36ea..e99ec45c7 100644 --- a/tests/event/test_event_reorg.py +++ b/tests/event/test_event_reorg.py @@ -10,8 +10,6 @@ settings = HathorSettings() -# TODO: This test fails with Clock because of pubsub event ordering caused by -# ReactorThread.get_current_thread. Changing MAIN_THREAD to NOT_RUNNING makes it work class BaseEventReorgTest(unittest.TestCase): __test__ = False diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index 782bcf260..066baacc8 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -4,6 +4,7 @@ import pytest from hathor.event.model.node_state import NodeState +from hathor.event.storage import EventStorage from hathor.event.storage.memory_storage import EventMemoryStorage from hathor.event.storage.rocksdb_storage import EventRocksDBStorage from hathor.storage.rocksdb_storage import RocksDBStorage @@ -14,6 +15,8 @@ class EventStorageBaseTest(unittest.TestCase): __test__ = False + event_storage: EventStorage + def setUp(self): super().setUp() self.event_mocker = EventMocker(self.rng) @@ -121,7 +124,7 @@ def test_save_node_state_and_retrieve(self): def test_clear_events_empty_database(self): self._test_clear_events() - def _test_clear_events(self, expected_node_state: Optional[NodeState] = None): + def _test_clear_events(self, expected_node_state: Optional[NodeState] = None) -> None: self.event_storage.clear_events() events = list(self.event_storage.iter_from_event(0)) From 5e86209f5e3ed2d229ccc67dc2162eab9e5af699 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Tue, 18 Apr 2023 16:26:04 -0300 Subject: [PATCH 11/21] feat(events): add missing docstring and other minor changes --- hathor/event/model/node_state.py | 4 ++-- hathor/transaction/util.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/hathor/event/model/node_state.py b/hathor/event/model/node_state.py index b785ddd3a..33c0c283d 100644 --- a/hathor/event/model/node_state.py +++ b/hathor/event/model/node_state.py @@ -16,5 +16,5 @@ class NodeState(Enum): - LOAD = auto() - SYNC = auto() + LOAD = 0 + SYNC = 1 diff --git a/hathor/transaction/util.py b/hathor/transaction/util.py index 0ce21a95b..fb9b1b9c7 100644 --- a/hathor/transaction/util.py +++ b/hathor/transaction/util.py @@ -28,7 +28,16 @@ def int_to_bytes(number: int, size: int, signed: bool = False) -> bytes: return number.to_bytes(size, byteorder='big', signed=signed) -def bytes_to_int(data: bytes, signed: bool = False) -> int: +def bytes_to_int(data: bytes, *, signed: bool = False) -> int: + """ + Converts data in bytes to an int. Assumes big-endian format. + + Args: + data: bytes to be converted + signed: whether two's complement is used to represent the integer. + + Returns: the converted data as int + """ return int.from_bytes(data, byteorder='big', signed=signed) From f503b41faedf42d33183722cf9a6c8daa7593398 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Tue, 18 Apr 2023 17:03:30 -0300 Subject: [PATCH 12/21] feat(events): lint code --- hathor/event/model/node_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hathor/event/model/node_state.py b/hathor/event/model/node_state.py index 33c0c283d..161e0061d 100644 --- a/hathor/event/model/node_state.py +++ b/hathor/event/model/node_state.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from enum import Enum, auto +from enum import Enum class NodeState(Enum): From 1201f533627f3b0f7fde63b51ee8e796ad14c822 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 19 Apr 2023 17:21:32 -0300 Subject: [PATCH 13/21] feat(events): fix bug in load phase events --- hathor/event/event_manager.py | 6 +++--- tests/event/test_event_reorg.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index e43ddd443..9c6c7ac67 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -225,9 +225,9 @@ def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransact return for vertex in topological_iterator: - args = EventArguments(tx=vertex) - self._reactor.callLater( delay=0, - callable=lambda: self._handle_event(EventType.NEW_VERTEX_ACCEPTED, args) + callable=self._handle_event, + event_type=EventType.NEW_VERTEX_ACCEPTED, + event_args=EventArguments(tx=vertex) ) diff --git a/tests/event/test_event_reorg.py b/tests/event/test_event_reorg.py index e99ec45c7..878e21b39 100644 --- a/tests/event/test_event_reorg.py +++ b/tests/event/test_event_reorg.py @@ -59,9 +59,9 @@ class unsorted(list): expected_events_grouped = [ [ (EventType.LOAD_STARTED, {}), - (EventType.NEW_VERTEX_ACCEPTED, {}), - (EventType.NEW_VERTEX_ACCEPTED, {}), - (EventType.NEW_VERTEX_ACCEPTED, {}), + (EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_BLOCK_HASH.hex()}), + (EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX1_HASH.hex()}), + (EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX2_HASH.hex()}), (EventType.LOAD_FINISHED, {}) ], # XXX: the order of the following events can vary depending on which genesis is spent/confirmed first From 93c7d3aff2ea38e1056d7545479156d89a9826ad Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 19 Apr 2023 17:31:03 -0300 Subject: [PATCH 14/21] feat(events): remove wrong docstring --- hathor/pubsub.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/hathor/pubsub.py b/hathor/pubsub.py index 255088f80..0a0e0153e 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -90,9 +90,6 @@ class HathorEvents(Enum): REORG_FINISHED Triggered when consensus algorithm ends all changes involved in a reorg - - VERTEX_METADATA_CHANGED - Triggered when consensus algorithm changes a metadata of an existing vertex (transaction or block) """ MANAGER_ON_START = 'manager:on_start' MANAGER_ON_STOP = 'manager:on_stop' From 2ae0d54b16590b790246ceea615b032c9f639e4a Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 20 Apr 2023 14:56:46 -0300 Subject: [PATCH 15/21] feat(events): remove emit-load-events tests from test_cli_builder --- tests/others/test_cli_builder.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/others/test_cli_builder.py b/tests/others/test_cli_builder.py index eb248003e..956867fe7 100644 --- a/tests/others/test_cli_builder.py +++ b/tests/others/test_cli_builder.py @@ -163,7 +163,6 @@ def test_event_queue_with_rocksdb_storage(self): self.assertIsInstance(manager._event_manager, EventManager) self.assertIsInstance(manager._event_manager._event_storage, EventRocksDBStorage) self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) - self.assertFalse(manager._event_manager.emit_load_events) def test_event_queue_with_memory_storage(self): manager = self._build(['--x-enable-event-queue', '--memory-storage']) @@ -171,16 +170,7 @@ def test_event_queue_with_memory_storage(self): self.assertIsInstance(manager._event_manager, EventManager) self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage) self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) - self.assertFalse(manager._event_manager.emit_load_events) def test_event_queue_with_full_verification(self): args = ['--x-enable-event-queue', '--memory-storage', '--x-full-verification'] self._build_with_error(args, '--x-full-verification cannot be used with --x-enable-event-queue') - - def test_event_queue_with_emit_load_events(self): - manager = self._build(['--x-enable-event-queue', '--memory-storage', '--x-emit-load-events']) - - self.assertIsInstance(manager._event_manager, EventManager) - self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage) - self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory) - self.assertTrue(manager._event_manager.emit_load_events) From faa275428919c9c7dcbfcbb9785a0af15a9a8495 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 26 Apr 2023 12:24:40 -0300 Subject: [PATCH 16/21] feat(events): implement event queue feature state methods in event storage --- hathor/event/storage/event_storage.py | 15 +++++++++++ hathor/event/storage/memory_storage.py | 10 ++++++++ hathor/event/storage/rocksdb_storage.py | 21 +++++++++++++++ tests/event/test_event_storage.py | 34 ++++++++++++++++++++++--- 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/hathor/event/storage/event_storage.py b/hathor/event/storage/event_storage.py index 2f963559b..f659630c5 100644 --- a/hathor/event/storage/event_storage.py +++ b/hathor/event/storage/event_storage.py @@ -59,3 +59,18 @@ def save_node_state(self, state: NodeState) -> None: def get_node_state(self) -> Optional[NodeState]: """Get the node state from the storage""" raise NotImplementedError + + @abstractmethod + def save_event_queue_enabled(self) -> None: + """Save that event queue feature is enabled in the storage""" + raise NotImplementedError + + @abstractmethod + def save_event_queue_disabled(self) -> None: + """Save that event queue feature is disabled in the storage""" + raise NotImplementedError + + @abstractmethod + def get_event_queue_state(self) -> bool: + """Get whether the event queue feature is enabled from the storage""" + raise NotImplementedError diff --git a/hathor/event/storage/memory_storage.py b/hathor/event/storage/memory_storage.py index 05843a727..1ba63081e 100644 --- a/hathor/event/storage/memory_storage.py +++ b/hathor/event/storage/memory_storage.py @@ -25,6 +25,7 @@ def __init__(self) -> None: self._last_event: Optional[BaseEvent] = None self._last_group_id: Optional[int] = None self._node_state: Optional[NodeState] = None + self._event_queue_enabled: bool = False def save_event(self, event: BaseEvent) -> None: if event.id != len(self._events): @@ -67,3 +68,12 @@ def save_node_state(self, state: NodeState) -> None: def get_node_state(self) -> Optional[NodeState]: return self._node_state + + def save_event_queue_enabled(self) -> None: + self._event_queue_enabled = True + + def save_event_queue_disabled(self) -> None: + self._event_queue_enabled = False + + def get_event_queue_state(self) -> bool: + return self._event_queue_enabled diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index eb1de2bd0..decd726b4 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -25,6 +25,7 @@ _CF_NAME_META = b'event-metadata' _KEY_LAST_GROUP_ID = b'last-group-id' _KEY_NODE_STATE = b'node-state' +_KEY_EVENT_QUEUE_ENABLED = b'event-queue-enabled' class EventRocksDBStorage(EventStorage): @@ -111,3 +112,23 @@ def get_node_state(self) -> Optional[NodeState]: node_state_int = bytes_to_int(node_state_bytes) return NodeState(node_state_int) + + def save_event_queue_enabled(self) -> None: + self._save_event_queue_state(True) + + def save_event_queue_disabled(self) -> None: + self._save_event_queue_state(False) + + def _save_event_queue_state(self, enabled: bool) -> None: + self._db.put( + (self._cf_meta, _KEY_EVENT_QUEUE_ENABLED), + enabled.to_bytes(length=1, byteorder='big') + ) + + def get_event_queue_state(self) -> bool: + enabled_bytes = self._db.get((self._cf_meta, _KEY_EVENT_QUEUE_ENABLED)) + + if enabled_bytes is None: + return False + + return bool.from_bytes(enabled_bytes, byteorder='big') diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index 066baacc8..de8102a3f 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -121,19 +121,36 @@ def test_save_node_state_and_retrieve(self): assert node_state == NodeState.SYNC + def test_get_empty_event_queue_state(self): + enabled = self.event_storage.get_event_queue_state() + + assert enabled is False + + def test_save_event_queue_enabled_and_retrieve(self): + self.event_storage.save_event_queue_enabled() + enabled = self.event_storage.get_event_queue_state() + + assert enabled is True + + def test_save_event_queue_disabled_and_retrieve(self): + self.event_storage.save_event_queue_disabled() + enabled = self.event_storage.get_event_queue_state() + + assert enabled is False + def test_clear_events_empty_database(self): self._test_clear_events() - def _test_clear_events(self, expected_node_state: Optional[NodeState] = None) -> None: + def _test_clear_events(self) -> None: self.event_storage.clear_events() events = list(self.event_storage.iter_from_event(0)) + last_event = self.event_storage.get_last_event() last_group_id = self.event_storage.get_last_group_id() - node_state = self.event_storage.get_node_state() assert events == [] + assert last_event is None assert last_group_id is None - assert node_state == expected_node_state def test_clear_events_full_database(self): n_events = 10 @@ -142,16 +159,25 @@ def test_clear_events_full_database(self): self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4) self.event_storage.save_node_state(expected_node_state) + self.event_storage.save_event_queue_enabled() events = list(self.event_storage.iter_from_event(0)) last_group_id = self.event_storage.get_last_group_id() node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() assert len(events) == n_events assert last_group_id == expected_last_group_id assert node_state == expected_node_state + assert event_queue_state is True + + self._test_clear_events() - self._test_clear_events(expected_node_state) + node_state = self.event_storage.get_node_state() + event_queue_state = self.event_storage.get_event_queue_state() + + assert node_state == expected_node_state + assert event_queue_state is True @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') From 8675e4cea5fcd82c54ee83b8d57e843f2d827dd1 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 26 Apr 2023 13:07:53 -0300 Subject: [PATCH 17/21] feat(events): set event_storage on HathorManager --- hathor/builder/builder.py | 2 ++ hathor/manager.py | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index c3815ed27..79115041d 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -24,6 +24,7 @@ from hathor.event import EventManager from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage from hathor.event.websocket import EventWebsocketFactory +from hathor.event.storage import EventStorage, EventMemoryStorage, EventRocksDBStorage from hathor.indexes import IndexesManager from hathor.manager import HathorManager from hathor.p2p.manager import ConnectionsManager @@ -131,6 +132,7 @@ def build(self) -> BuildArtifacts: event_storage = self._get_or_create_event_storage() event_manager = self._get_or_create_event_manager() tx_storage = self._get_or_create_tx_storage() + event_storage = self._get_or_create_event_storage() indexes = tx_storage.indexes assert indexes is not None diff --git a/hathor/manager.py b/hathor/manager.py index 7762b659a..7eecb12df 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -125,6 +125,11 @@ def __init__(self, if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2): raise TypeError(f'{type(self).__name__}() at least one sync version is required') + if event_storage.get_event_queue_state() is True and not event_manager: + raise ValueError( + 'cannot start manager without event queue feature, as it was enabled in the previous startup' + ) + self._enable_sync_v1 = enable_sync_v1 self._enable_sync_v2 = enable_sync_v2 @@ -176,6 +181,10 @@ def __init__(self, if self._event_manager: assert self._event_manager.event_storage == event_storage + event_storage.save_event_queue_enabled() + else: + event_storage.save_event_queue_disabled() + if enable_sync_v2: assert self.tx_storage.indexes is not None self.log.debug('enable sync-v2 indexes') From 1e787fe2246f2b8f6ad0d6b0fb28a863c54ae31d Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 26 Apr 2023 14:01:24 -0300 Subject: [PATCH 18/21] feat(events): reorganize rebased code --- hathor/builder/builder.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 79115041d..7b562315d 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -132,7 +132,6 @@ def build(self) -> BuildArtifacts: event_storage = self._get_or_create_event_storage() event_manager = self._get_or_create_event_manager() tx_storage = self._get_or_create_tx_storage() - event_storage = self._get_or_create_event_storage() indexes = tx_storage.indexes assert indexes is not None From 068b766e65f04a43a690c5bbd4a36ce0f570bbf8 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 26 Apr 2023 17:08:55 -0300 Subject: [PATCH 19/21] chore(builder): fix tests --- hathor/builder/builder.py | 1 - tests/event/test_event_storage.py | 1 - 2 files changed, 2 deletions(-) diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 7b562315d..c3815ed27 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -24,7 +24,6 @@ from hathor.event import EventManager from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage from hathor.event.websocket import EventWebsocketFactory -from hathor.event.storage import EventStorage, EventMemoryStorage, EventRocksDBStorage from hathor.indexes import IndexesManager from hathor.manager import HathorManager from hathor.p2p.manager import ConnectionsManager diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index de8102a3f..4df05617b 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -1,5 +1,4 @@ import tempfile -from typing import Optional import pytest From c5227cdaa40ada7ce6e5252dbf19e26ab0156b18 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Tue, 2 May 2023 00:17:40 -0300 Subject: [PATCH 20/21] feat(events): reorganize event manager and storage initialization --- hathor/builder/builder.py | 13 ++++++++----- hathor/builder/cli_builder.py | 18 ++++++++--------- hathor/event/event_manager.py | 21 +++++++++++++++----- hathor/event/storage/event_storage.py | 9 ++------- hathor/event/storage/memory_storage.py | 7 ++----- hathor/event/storage/rocksdb_storage.py | 8 +------- hathor/manager.py | 26 ++++++++++--------------- tests/event/test_event_storage.py | 6 +++--- tests/others/test_cli_builder.py | 2 +- 9 files changed, 52 insertions(+), 58 deletions(-) diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index c3815ed27..a5954880e 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -85,6 +85,7 @@ def __init__(self) -> None: self._event_manager: Optional[EventManager] = None self._event_ws_factory: Optional[EventWebsocketFactory] = None + self._enable_event_queue: Optional[bool] = None self._rocksdb_path: Optional[str] = None self._rocksdb_storage: Optional[RocksDBStorage] = None @@ -128,7 +129,6 @@ def build(self) -> BuildArtifacts: consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub) wallet = self._get_or_create_wallet() - event_storage = self._get_or_create_event_storage() event_manager = self._get_or_create_event_manager() tx_storage = self._get_or_create_tx_storage() indexes = tx_storage.indexes @@ -161,20 +161,22 @@ def build(self) -> BuildArtifacts: if self._full_verification is not None: kwargs['full_verification'] = self._full_verification + if self._enable_event_queue is not None: + kwargs['enable_event_queue'] = self._enable_event_queue + manager = HathorManager( reactor, pubsub=pubsub, consensus_algorithm=consensus_algorithm, peer_id=peer_id, tx_storage=tx_storage, - event_storage=event_storage, + event_manager=event_manager, network=self._network, wallet=wallet, rng=self._rng, checkpoints=self._checkpoints, capabilities=self._capabilities, environment_info=get_environment_info(self._cmdline, peer_id.id), - event_manager=event_manager, **kwargs ) @@ -303,8 +305,8 @@ def _get_or_create_event_storage(self) -> EventStorage: return self._event_storage - def _get_or_create_event_manager(self) -> Optional[EventManager]: - if self._event_manager is None and self._event_ws_factory is not None: + def _get_or_create_event_manager(self) -> EventManager: + if self._event_manager is None: self._event_manager = EventManager( reactor=self._get_reactor(), pubsub=self._get_or_create_pubsub(), @@ -389,6 +391,7 @@ def enable_wallet_index(self) -> 'Builder': def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder': self.check_if_can_modify() + self._enable_event_queue = True self._event_ws_factory = event_ws_factory return self diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 4eebe317c..b6ed4006a 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -136,15 +136,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa pubsub = PubSubManager(reactor) - event_manager: Optional[EventManager] = None if args.x_enable_event_queue: self.event_ws_factory = EventWebsocketFactory(reactor, event_storage) - event_manager = EventManager( - event_storage=event_storage, - event_ws_factory=self.event_ws_factory, - pubsub=pubsub, - reactor=reactor - ) + + event_manager = EventManager( + event_storage=event_storage, + event_ws_factory=self.event_ws_factory, + pubsub=pubsub, + reactor=reactor + ) if args.wallet_index and tx_storage.indexes is not None: self.log.debug('enable wallet indexes') @@ -170,7 +170,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa network=network, hostname=hostname, tx_storage=tx_storage, - event_storage=event_storage, event_manager=event_manager, wallet=self.wallet, stratum_port=args.stratum, @@ -181,7 +180,8 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa enable_sync_v2=enable_sync_v2, consensus_algorithm=consensus_algorithm, environment_info=get_environment_info(args=str(args), peer_id=peer_id.id), - full_verification=full_verification + full_verification=full_verification, + enable_event_queue=bool(args.x_enable_event_queue) ) if args.data: diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 9c6c7ac67..ff3197b29 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -64,9 +64,9 @@ def event_storage(self) -> EventStorage: def __init__( self, event_storage: EventStorage, - event_ws_factory: EventWebsocketFactory, pubsub: PubSubManager, - reactor: Reactor + reactor: Reactor, + event_ws_factory: Optional[EventWebsocketFactory] = None, ): self.log = logger.new() @@ -75,11 +75,10 @@ def __init__( self._event_ws_factory = event_ws_factory self._pubsub = pubsub - self._assert_closed_event_group() - self._subscribe_events() - def start(self, peer_id: str) -> None: assert self._is_running is False, 'Cannot start, EventManager is already running' + assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set' + assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled' self._previous_node_state = self._event_storage.get_node_state() @@ -89,12 +88,16 @@ def start(self, peer_id: str) -> None: self._last_event = self._event_storage.get_last_event() self._last_existing_group_id = self._event_storage.get_last_group_id() + self._assert_closed_event_group() + self._subscribe_events() + self._peer_id = peer_id self._event_ws_factory.start() self._is_running = True def stop(self): assert self._is_running is True, 'Cannot stop, EventManager is not running' + assert self._event_ws_factory is not None self._event_ws_factory.stop() self._is_running = False @@ -138,6 +141,7 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No self._handle_event_creation(event_type, event_args) def _handle_event_creation(self, event_type: EventType, event_args: EventArguments) -> None: + assert self._event_ws_factory is not None create_event_fn: Callable[[EventType, EventArguments], BaseEvent] if event_type in _GROUP_START_EVENTS: @@ -214,6 +218,13 @@ def _handle_load_finished(self): def _should_reload_events(self) -> bool: return self._previous_node_state in [None, NodeState.LOAD] + def get_event_queue_state(self) -> bool: + """Get whether the event queue feature is enabled from the storage""" + return self._event_storage.get_event_queue_state() + + def save_event_queue_state(self, state: bool) -> None: + self._event_storage.save_event_queue_state(state) + def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None: """ Either generates load phase events or not, depending on previous node state. diff --git a/hathor/event/storage/event_storage.py b/hathor/event/storage/event_storage.py index f659630c5..90a68d761 100644 --- a/hathor/event/storage/event_storage.py +++ b/hathor/event/storage/event_storage.py @@ -61,13 +61,8 @@ def get_node_state(self) -> Optional[NodeState]: raise NotImplementedError @abstractmethod - def save_event_queue_enabled(self) -> None: - """Save that event queue feature is enabled in the storage""" - raise NotImplementedError - - @abstractmethod - def save_event_queue_disabled(self) -> None: - """Save that event queue feature is disabled in the storage""" + def save_event_queue_state(self, enabled: bool) -> None: + """Save whether the event queue feature is enabled in the storage""" raise NotImplementedError @abstractmethod diff --git a/hathor/event/storage/memory_storage.py b/hathor/event/storage/memory_storage.py index 1ba63081e..569d51d0f 100644 --- a/hathor/event/storage/memory_storage.py +++ b/hathor/event/storage/memory_storage.py @@ -69,11 +69,8 @@ def save_node_state(self, state: NodeState) -> None: def get_node_state(self) -> Optional[NodeState]: return self._node_state - def save_event_queue_enabled(self) -> None: - self._event_queue_enabled = True - - def save_event_queue_disabled(self) -> None: - self._event_queue_enabled = False + def save_event_queue_state(self, enabled: bool) -> None: + self._event_queue_enabled = enabled def get_event_queue_state(self) -> bool: return self._event_queue_enabled diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index decd726b4..eae5f5305 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -113,13 +113,7 @@ def get_node_state(self) -> Optional[NodeState]: return NodeState(node_state_int) - def save_event_queue_enabled(self) -> None: - self._save_event_queue_state(True) - - def save_event_queue_disabled(self) -> None: - self._save_event_queue_state(False) - - def _save_event_queue_state(self, enabled: bool) -> None: + def save_event_queue_state(self, enabled: bool) -> None: self._db.put( (self._cf_meta, _KEY_EVENT_QUEUE_ENABLED), enabled.to_bytes(length=1, byteorder='big') diff --git a/hathor/manager.py b/hathor/manager.py index 7eecb12df..b608f4386 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -30,7 +30,6 @@ from hathor.conf import HathorSettings from hathor.consensus import ConsensusAlgorithm from hathor.event.event_manager import EventManager -from hathor.event.storage import EventStorage from hathor.exception import ( DoubleSpendingError, HathorError, @@ -88,11 +87,10 @@ def __init__(self, consensus_algorithm: ConsensusAlgorithm, peer_id: PeerId, tx_storage: TransactionStorage, - event_storage: EventStorage, + event_manager: EventManager, network: str, hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, - event_manager: Optional[EventManager] = None, stratum_port: Optional[int] = None, ssl: bool = True, enable_sync_v1: bool = False, @@ -102,7 +100,8 @@ def __init__(self, checkpoints: Optional[List[Checkpoint]] = None, rng: Optional[Random] = None, environment_info: Optional[EnvironmentInfo] = None, - full_verification: bool = False): + full_verification: bool = False, + enable_event_queue: bool = False): """ :param reactor: Twisted reactor which handles the mainloop and the events. :param peer_id: Id of this node. @@ -125,7 +124,7 @@ def __init__(self, if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2): raise TypeError(f'{type(self).__name__}() at least one sync version is required') - if event_storage.get_event_queue_state() is True and not event_manager: + if event_manager.get_event_queue_state() is True and not enable_event_queue: raise ValueError( 'cannot start manager without event queue feature, as it was enabled in the previous startup' ) @@ -177,13 +176,8 @@ def __init__(self, self.tx_storage.pubsub = self.pubsub self._event_manager = event_manager - - if self._event_manager: - assert self._event_manager.event_storage == event_storage - - event_storage.save_event_queue_enabled() - else: - event_storage.save_event_queue_disabled() + self._event_manager.save_event_queue_state(enable_event_queue) + self._enable_event_queue = enable_event_queue if enable_sync_v2: assert self.tx_storage.indexes is not None @@ -294,7 +288,7 @@ def start(self) -> None: ) sys.exit(-1) - if self._event_manager: + if self._enable_event_queue: self._event_manager.start(not_none(self.my_peer.id)) self.state = self.NodeState.INITIALIZING @@ -364,7 +358,7 @@ def stop(self) -> Deferred: if wait_stratum: waits.append(wait_stratum) - if self._event_manager: + if self._enable_event_queue: self._event_manager.stop() self.tx_storage.flush() @@ -405,7 +399,7 @@ def _initialize_components(self) -> None: This method runs through all transactions, verifying them and updating our wallet. """ - assert not self._event_manager, 'this method cannot be used if the events feature is enabled.' + assert not self._enable_event_queue, 'this method cannot be used if the events feature is enabled.' self.log.info('initialize') if self.wallet: @@ -659,7 +653,7 @@ def _initialize_components_new(self) -> None: # XXX: last step before actually starting is updating the last started at timestamps self.tx_storage.update_last_started_at(started_at) - if self._event_manager: + if self._enable_event_queue: topological_iterator = self.tx_storage.topological_iterator() self._event_manager.handle_load_phase_vertices(topological_iterator) diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index 4df05617b..4889bcb15 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -126,13 +126,13 @@ def test_get_empty_event_queue_state(self): assert enabled is False def test_save_event_queue_enabled_and_retrieve(self): - self.event_storage.save_event_queue_enabled() + self.event_storage.save_event_queue_state(True) enabled = self.event_storage.get_event_queue_state() assert enabled is True def test_save_event_queue_disabled_and_retrieve(self): - self.event_storage.save_event_queue_disabled() + self.event_storage.save_event_queue_state(False) enabled = self.event_storage.get_event_queue_state() assert enabled is False @@ -158,7 +158,7 @@ def test_clear_events_full_database(self): self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4) self.event_storage.save_node_state(expected_node_state) - self.event_storage.save_event_queue_enabled() + self.event_storage.save_event_queue_state(True) events = list(self.event_storage.iter_from_event(0)) last_group_id = self.event_storage.get_last_group_id() diff --git a/tests/others/test_cli_builder.py b/tests/others/test_cli_builder.py index 956867fe7..4aa4b0e00 100644 --- a/tests/others/test_cli_builder.py +++ b/tests/others/test_cli_builder.py @@ -58,7 +58,7 @@ def test_all_default(self): self.assertNotIn(SyncVersion.V2, manager.connections._sync_factories) self.assertFalse(self.resources_builder._built_prometheus) self.assertFalse(self.resources_builder._built_status) - self.assertIsNone(manager._event_manager) + self.assertFalse(manager._enable_event_queue) @pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') def test_cache_storage(self): From 75f3d662bde57c4135034dd470580662b049a669 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Fri, 12 May 2023 22:48:37 -0300 Subject: [PATCH 21/21] fix rebase conflicts --- hathor/builder/resources_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hathor/builder/resources_builder.py b/hathor/builder/resources_builder.py index dd9022558..8925c0d5b 100644 --- a/hathor/builder/resources_builder.py +++ b/hathor/builder/resources_builder.py @@ -250,7 +250,7 @@ def create_resources(self, args: Namespace) -> server.Site: ws_factory.subscribe(self.manager.pubsub) # Event websocket resource - if args.x_enable_event_queue and self.event_ws_factory is not None: + if args.x_enable_event_queue: root.putChild(b'event_ws', WebSocketResource(self.event_ws_factory)) root.putChild(b'event', EventResource(self.manager._event_manager))