Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from hathor.conf.settings import HathorSettings as HathorSettingsType
from hathor.consensus import ConsensusAlgorithm
from hathor.event import EventManager
from hathor.event.model.EventQueueOptions import EventQueueOptions
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.indexes import IndexesManager
Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(self) -> None:

self._event_manager: Optional[EventManager] = None
self._event_ws_factory: Optional[EventWebsocketFactory] = None
self._enable_event_queue: Optional[bool] = None
self._event_queue_options: Optional[EventQueueOptions] = None

self._rocksdb_path: Optional[str] = None
self._rocksdb_storage: Optional[RocksDBStorage] = None
Expand Down Expand Up @@ -158,8 +159,8 @@ def build(self) -> BuildArtifacts:
if self._full_verification is not None:
kwargs['full_verification'] = self._full_verification

if self._enable_event_queue is not None:
kwargs['enable_event_queue'] = self._enable_event_queue
if self._event_queue_options is not None:
kwargs['event_queue_options'] = self._event_queue_options

manager = HathorManager(
reactor,
Expand Down Expand Up @@ -387,7 +388,7 @@ def enable_wallet_index(self) -> 'Builder':

def enable_event_manager(self, *, event_ws_factory: EventWebsocketFactory) -> 'Builder':
self.check_if_can_modify()
self._enable_event_queue = True
self._event_queue_options = EventQueueOptions(enable=True)
self._event_ws_factory = event_ws_factory
return self

Expand Down
25 changes: 15 additions & 10 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from hathor.consensus import ConsensusAlgorithm
from hathor.event import EventManager
from hathor.event.model.EventQueueOptions import EventQueueOptions
from hathor.event.resources.event import EventResource
from hathor.exception import BuilderError
from hathor.indexes import IndexesManager
Expand Down Expand Up @@ -170,6 +171,19 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)

if args.x_enable_event_queue:
if not settings.ENABLE_EVENT_QUEUE_FEATURE:
self.log.error('The event queue feature is not available yet')
sys.exit(-1)

self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

event_queue_options = EventQueueOptions(
enable=bool(args.x_enable_event_queue),
reset=bool(args.x_reset_event_queue)
)

self.manager = HathorManager(
reactor,
pubsub=pubsub,
Expand All @@ -187,7 +201,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
consensus_algorithm=consensus_algorithm,
environment_info=get_environment_info(args=str(args), peer_id=peer_id.id),
full_verification=full_verification,
enable_event_queue=bool(args.x_enable_event_queue)
event_queue_options=event_queue_options
)

if args.allow_mining_without_peers:
Expand Down Expand Up @@ -224,15 +238,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
if args.memory_indexes and args.memory_storage:
self.log.warn('--memory-indexes is implied for memory storage or JSON storage')

if args.x_enable_event_queue:
if not settings.ENABLE_EVENT_QUEUE_FEATURE:
self.log.error('The event queue feature is not available yet')
sys.exit(-1)

self.manager.enable_event_queue = True
self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

for description in args.listen:
self.manager.add_listen_address(description)

Expand Down
1 change: 1 addition & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def create_parser(cls) -> ArgumentParser:
parser.add_argument('--x-localhost-only', action='store_true', help='Only connect to peers on localhost')
parser.add_argument('--x-rocksdb-indexes', action='store_true', help=SUPPRESS)
parser.add_argument('--x-enable-event-queue', action='store_true', help='Enable event queue mechanism')
parser.add_argument('--x-reset-event-queue', action='store_true', help='Reset the event queue')
parser.add_argument('--peer-id-blacklist', action='extend', default=[], nargs='+', type=str,
help='Peer IDs to forbid connection')
return parser
Expand Down
6 changes: 5 additions & 1 deletion hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def start(self, peer_id: str) -> None:
self._previous_node_state = self._event_storage.get_node_state()

if self._should_reload_events():
self._event_storage.clear_events()
self._event_storage.reset_events()
else:
self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()
Expand Down Expand Up @@ -218,6 +218,10 @@ def _handle_load_finished(self):
def _should_reload_events(self) -> bool:
return self._previous_node_state in [None, NodeState.LOAD]

def reset_all(self) -> None:
"""Reset all data."""
self._event_storage.reset_all()

def get_event_queue_state(self) -> bool:
"""Get whether the event queue feature is enabled from the storage"""
return self._event_storage.get_event_queue_state()
Expand Down
20 changes: 20 additions & 0 deletions hathor/event/model/EventQueueOptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from hathor.utils.pydantic import BaseModel


class EventQueueOptions(BaseModel):
enable: bool = False
reset: bool = False
3 changes: 2 additions & 1 deletion hathor/event/model/event_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def from_event_arguments(cls, args: EventArguments) -> 'EmptyData':

class TxData(BaseEventData, extra=Extra.ignore):
hash: str
nonce: int
nonce: Optional[int] = None
timestamp: int
version: int
weight: float
Expand All @@ -100,6 +100,7 @@ class TxData(BaseEventData, extra=Extra.ignore):
token_name: Optional[str]
token_symbol: Optional[str]
metadata: 'TxMetadata'
aux_pow: Optional[str] = None

@classmethod
def from_event_arguments(cls, args: EventArguments) -> 'TxData':
Expand Down
9 changes: 7 additions & 2 deletions hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
raise NotImplementedError

@abstractmethod
def clear_events(self) -> None:
"""Clear all stored events and related metadata."""
def reset_events(self) -> None:
"""Reset stored events and related metadata."""
raise NotImplementedError

@abstractmethod
def reset_all(self) -> None:
"""Reset all data."""
raise NotImplementedError

@abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
yield self._events[key]
key += 1

def clear_events(self) -> None:
def reset_events(self) -> None:
self._events = []
self._last_event = None
self._last_group_id = None

def reset_all(self) -> None:
self.reset_events()
self._node_state = None
self._event_queue_enabled = False

def save_node_state(self, state: NodeState) -> None:
self._node_state = state

Expand Down
7 changes: 6 additions & 1 deletion hathor/event/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def get_last_event(self) -> Optional[BaseEvent]:
def get_last_group_id(self) -> Optional[int]:
return self._last_group_id

def clear_events(self) -> None:
def reset_events(self) -> None:
self._last_event = None
self._last_group_id = None

Expand All @@ -100,6 +100,11 @@ def clear_events(self) -> None:

self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT)

def reset_all(self) -> None:
self.reset_events()
self._db.delete((self._cf_meta, _KEY_NODE_STATE))
self._db.delete((self._cf_meta, _KEY_EVENT_QUEUE_ENABLED))

def save_node_state(self, state: NodeState) -> None:
self._db.put((self._cf_meta, _KEY_NODE_STATE), int_to_bytes(state.value, 8))

Expand Down
24 changes: 12 additions & 12 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from hathor.conf import HathorSettings
from hathor.consensus import ConsensusAlgorithm
from hathor.event.event_manager import EventManager
from hathor.event.model.EventQueueOptions import EventQueueOptions
from hathor.exception import (
DoubleSpendingError,
HathorError,
Expand Down Expand Up @@ -100,7 +101,7 @@ def __init__(self,
rng: Optional[Random] = None,
environment_info: Optional[EnvironmentInfo] = None,
full_verification: bool = False,
enable_event_queue: bool = False):
event_queue_options: EventQueueOptions = EventQueueOptions()):
"""
:param reactor: Twisted reactor which handles the mainloop and the events.
:param peer_id: Id of this node.
Expand All @@ -123,7 +124,10 @@ def __init__(self,
if not (enable_sync_v1 or enable_sync_v2):
raise TypeError(f'{type(self).__name__}() at least one sync version is required')

if event_manager.get_event_queue_state() is True and not enable_event_queue:
if event_queue_options.reset:
event_manager.reset_all()

if event_manager.get_event_queue_state() is True and not event_queue_options.enable:
raise ValueError(
'cannot start manager without event queue feature, as it was enabled in the previous startup'
)
Expand Down Expand Up @@ -173,8 +177,8 @@ def __init__(self,
self.tx_storage.pubsub = self.pubsub

self._event_manager = event_manager
self._event_manager.save_event_queue_state(enable_event_queue)
self._enable_event_queue = enable_event_queue
self._event_manager.save_event_queue_state(event_queue_options.enable)
self._event_queue_options = event_queue_options

if enable_sync_v2:
assert self.tx_storage.indexes is not None
Expand Down Expand Up @@ -227,10 +231,6 @@ def __init__(self,
# Can be activated on the command line with --full-verification
self._full_verification = full_verification

# Activated with --x-enable-event-queue flag
# It activates the event mechanism inside full node
self.enable_event_queue = False

# List of whitelisted peers
self.peers_whitelist: List[str] = []

Expand Down Expand Up @@ -284,7 +284,7 @@ def start(self) -> None:
)
sys.exit(-1)

if self._enable_event_queue:
if self._event_queue_options.enable:
self._event_manager.start(not_none(self.my_peer.id))

self.state = self.NodeState.INITIALIZING
Expand Down Expand Up @@ -354,7 +354,7 @@ def stop(self) -> Deferred:
if wait_stratum:
waits.append(wait_stratum)

if self._enable_event_queue:
if self._event_queue_options.enable:
self._event_manager.stop()

self.tx_storage.flush()
Expand Down Expand Up @@ -395,7 +395,7 @@ def _initialize_components(self) -> None:

This method runs through all transactions, verifying them and updating our wallet.
"""
assert not self._enable_event_queue, 'this method cannot be used if the events feature is enabled.'
assert not self._event_queue_options.enable, 'this method cannot be used if the events feature is enabled.'

self.log.info('initialize')
if self.wallet:
Expand Down Expand Up @@ -649,7 +649,7 @@ def _initialize_components_new(self) -> None:
# XXX: last step before actually starting is updating the last started at timestamps
self.tx_storage.update_last_started_at(started_at)

if self._enable_event_queue:
if self._event_queue_options.enable:
topological_iterator = self.tx_storage.topological_iterator()
self._event_manager.handle_load_phase_vertices(topological_iterator)

Expand Down
57 changes: 51 additions & 6 deletions tests/event/test_event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,14 @@ def test_save_event_queue_disabled_and_retrieve(self):

assert enabled is False

def test_clear_events_empty_database(self):
self._test_clear_events()
def test_reset_events_empty_database(self):
self._test_reset_events()

def _test_clear_events(self) -> None:
self.event_storage.clear_events()
def test_reset_all_empty_database(self):
self._test_reset_events()

def _test_reset_events(self) -> None:
self.event_storage.reset_events()

events = list(self.event_storage.iter_from_event(0))
last_event = self.event_storage.get_last_event()
Expand All @@ -151,7 +154,22 @@ def _test_clear_events(self) -> None:
assert last_event is None
assert last_group_id is None

def test_clear_events_full_database(self):
def _test_reset_all(self) -> None:
self.event_storage.reset_all()

events = list(self.event_storage.iter_from_event(0))
last_event = self.event_storage.get_last_event()
last_group_id = self.event_storage.get_last_group_id()
node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert events == []
assert last_event is None
assert last_group_id is None
assert node_state is None
assert event_queue_state is False

def test_reset_events_full_database(self):
n_events = 10
expected_last_group_id = 4
expected_node_state = NodeState.SYNC
Expand All @@ -170,14 +188,41 @@ def test_clear_events_full_database(self):
assert node_state == expected_node_state
assert event_queue_state is True

self._test_clear_events()
self._test_reset_events()

node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert node_state == expected_node_state
assert event_queue_state is True

def test_reset_all_full_database(self):
n_events = 10
expected_last_group_id = 4
expected_node_state = NodeState.SYNC

self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4)
self.event_storage.save_node_state(expected_node_state)
self.event_storage.save_event_queue_state(True)

events = list(self.event_storage.iter_from_event(0))
last_group_id = self.event_storage.get_last_group_id()
node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert len(events) == n_events
assert last_group_id == expected_last_group_id
assert node_state == expected_node_state
assert event_queue_state is True

self._test_reset_all()

node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert node_state is None
assert event_queue_state is False


@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb')
class EventStorageRocksDBTest(EventStorageBaseTest):
Expand Down
Loading