Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)

if args.x_enable_event_queue:
if not settings.ENABLE_EVENT_QUEUE_FEATURE:
self.log.error('The event queue feature is not available yet')
sys.exit(-1)

self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

self.manager = HathorManager(
reactor,
pubsub=pubsub,
Expand All @@ -185,7 +193,7 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
consensus_algorithm=consensus_algorithm,
environment_info=get_environment_info(args=str(args), peer_id=peer_id.id),
full_verification=full_verification,
enable_event_queue=bool(args.x_enable_event_queue)
enable_event_queue=args.x_enable_event_queue
)

if args.stratum:
Expand Down Expand Up @@ -230,15 +238,6 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
if args.memory_indexes and args.memory_storage:
self.log.warn('--memory-indexes is implied for memory storage or JSON storage')

if args.x_enable_event_queue:
if not settings.ENABLE_EVENT_QUEUE_FEATURE:
self.log.error('The event queue feature is not available yet')
sys.exit(-1)

self.manager.enable_event_queue = True
self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

for description in args.listen:
self.manager.add_listen_address(description)

Expand Down
3 changes: 3 additions & 0 deletions hathor/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self) -> None:
peer_id,
quick_test,
replay_logs,
reset_event_queue,
run_node,
shell,
stratum_mining,
Expand Down Expand Up @@ -78,6 +79,8 @@ def __init__(self) -> None:
self.add_cmd('oracle', 'oracle-get-pubkey', oracle_get_pubkey,
'Read an oracle private key and output public key hash')
self.add_cmd('oracle', 'oracle-encode-data', oracle_encode_data, 'Encode data and sign it with a private key')
self.add_cmd('events', 'reset-event-queue', reset_event_queue, 'Delete all events and related data from the '
'database')
self.add_cmd('dev', 'shell', shell, 'Run a Python shell')
self.add_cmd('dev', 'quick_test', quick_test, 'Similar to run_node but will quit after receiving a tx')
self.add_cmd('dev', 'generate_nginx_config', nginx_config, 'Generate nginx config from OpenAPI json')
Expand Down
48 changes: 48 additions & 0 deletions hathor/cli/reset_event_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from argparse import ArgumentParser, Namespace

from structlog import get_logger

logger = get_logger()


def create_parser() -> ArgumentParser:
from hathor.cli.util import create_parser

parser = create_parser()
parser.add_argument('--data', help='Data directory')

return parser


def execute(args: Namespace) -> None:
from hathor.event.storage import EventRocksDBStorage
from hathor.storage import RocksDBStorage

assert args.data is not None, '--data is required'

rocksdb_storage = RocksDBStorage(path=args.data)
event_storage = EventRocksDBStorage(rocksdb_storage)

logger.info('removing all events and related data...')
event_storage.reset_all()
logger.info('reset complete')


def main():
parser = create_parser()
args = parser.parse_args()
execute(args)
2 changes: 1 addition & 1 deletion hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def start(self, peer_id: str) -> None:
self._previous_node_state = self._event_storage.get_node_state()

if self._should_reload_events():
self._event_storage.clear_events()
self._event_storage.reset_events()
else:
self._last_event = self._event_storage.get_last_event()
self._last_existing_group_id = self._event_storage.get_last_group_id()
Expand Down
3 changes: 2 additions & 1 deletion hathor/event/model/event_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def from_event_arguments(cls, args: EventArguments) -> 'EmptyData':

class TxData(BaseEventData, extra=Extra.ignore):
hash: str
nonce: int
nonce: Optional[int] = None
timestamp: int
version: int
weight: float
Expand All @@ -100,6 +100,7 @@ class TxData(BaseEventData, extra=Extra.ignore):
token_name: Optional[str]
token_symbol: Optional[str]
metadata: 'TxMetadata'
aux_pow: Optional[str] = None

@classmethod
def from_event_arguments(cls, args: EventArguments) -> 'TxData':
Expand Down
15 changes: 13 additions & 2 deletions hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,19 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
raise NotImplementedError

@abstractmethod
def clear_events(self) -> None:
"""Clear all stored events and related metadata."""
def reset_events(self) -> None:
"""
Reset event-related data: events, last_event, and last_group_id.
This should be used to clear old events from the database when reloading events.
"""
raise NotImplementedError

@abstractmethod
def reset_all(self) -> None:
"""
Reset all data and metadata: events, last_event, last_group_id, node_state, and event_queue_enabled.
This should be used for a full wipe out of the event storage.
"""
raise NotImplementedError

@abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ def iter_from_event(self, key: int) -> Iterator[BaseEvent]:
yield self._events[key]
key += 1

def clear_events(self) -> None:
def reset_events(self) -> None:
self._events = []
self._last_event = None
self._last_group_id = None

def reset_all(self) -> None:
self.reset_events()
self._node_state = None
self._event_queue_enabled = False

def save_node_state(self, state: NodeState) -> None:
self._node_state = state

Expand Down
7 changes: 6 additions & 1 deletion hathor/event/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def get_last_event(self) -> Optional[BaseEvent]:
def get_last_group_id(self) -> Optional[int]:
return self._last_group_id

def clear_events(self) -> None:
def reset_events(self) -> None:
self._last_event = None
self._last_group_id = None

Expand All @@ -100,6 +100,11 @@ def clear_events(self) -> None:

self._cf_event = self._rocksdb_storage.get_or_create_column_family(_CF_NAME_EVENT)

def reset_all(self) -> None:
self.reset_events()
self._db.delete((self._cf_meta, _KEY_NODE_STATE))
self._db.delete((self._cf_meta, _KEY_EVENT_QUEUE_ENABLED))

def save_node_state(self, state: NodeState) -> None:
self._db.put((self._cf_meta, _KEY_NODE_STATE), int_to_bytes(state.value, 8))

Expand Down
9 changes: 3 additions & 6 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ def __init__(self,
raise TypeError(f'{type(self).__name__}() at least one sync version is required')

if event_manager.get_event_queue_state() is True and not enable_event_queue:
raise ValueError(
'cannot start manager without event queue feature, as it was enabled in the previous startup'
raise InitializationError(
'Cannot start manager without event queue feature, as it was enabled in the previous startup. '
'Either enable it, or use the reset-event-queue CLI command to remove all event-related data'
)

self._enable_sync_v1 = enable_sync_v1
Expand Down Expand Up @@ -223,10 +224,6 @@ def __init__(self,
# Can be activated on the command line with --full-verification
self._full_verification = full_verification

# Activated with --x-enable-event-queue flag
# It activates the event mechanism inside full node
self.enable_event_queue = False

# List of whitelisted peers
self.peers_whitelist: List[str] = []

Expand Down
1 change: 1 addition & 0 deletions tests/event/test_base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_create_base_event(event_id, group_id):
token_name=None,
token_symbol=None,
tokens=[],
aux_pow=None,
metadata=dict(
hash='abc',
spent_outputs=[],
Expand Down
57 changes: 51 additions & 6 deletions tests/event/test_event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,14 @@ def test_save_event_queue_disabled_and_retrieve(self):

assert enabled is False

def test_clear_events_empty_database(self):
self._test_clear_events()
def test_reset_events_empty_database(self):
self._test_reset_events()

def _test_clear_events(self) -> None:
self.event_storage.clear_events()
def test_reset_all_empty_database(self):
self._test_reset_events()

def _test_reset_events(self) -> None:
self.event_storage.reset_events()

events = list(self.event_storage.iter_from_event(0))
last_event = self.event_storage.get_last_event()
Expand All @@ -151,7 +154,22 @@ def _test_clear_events(self) -> None:
assert last_event is None
assert last_group_id is None

def test_clear_events_full_database(self):
def _test_reset_all(self) -> None:
self.event_storage.reset_all()

events = list(self.event_storage.iter_from_event(0))
last_event = self.event_storage.get_last_event()
last_group_id = self.event_storage.get_last_group_id()
node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert events == []
assert last_event is None
assert last_group_id is None
assert node_state is None
assert event_queue_state is False

def test_reset_events_full_database(self):
n_events = 10
expected_last_group_id = 4
expected_node_state = NodeState.SYNC
Expand All @@ -170,14 +188,41 @@ def test_clear_events_full_database(self):
assert node_state == expected_node_state
assert event_queue_state is True

self._test_clear_events()
self._test_reset_events()

node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert node_state == expected_node_state
assert event_queue_state is True

def test_reset_all_full_database(self):
n_events = 10
expected_last_group_id = 4
expected_node_state = NodeState.SYNC

self._populate_events_and_last_group_id(n_events=n_events, last_group_id=4)
self.event_storage.save_node_state(expected_node_state)
self.event_storage.save_event_queue_state(True)

events = list(self.event_storage.iter_from_event(0))
last_group_id = self.event_storage.get_last_group_id()
node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert len(events) == n_events
assert last_group_id == expected_last_group_id
assert node_state == expected_node_state
assert event_queue_state is True

self._test_reset_all()

node_state = self.event_storage.get_node_state()
event_queue_state = self.event_storage.get_event_queue_state()

assert node_state is None
assert event_queue_state is False


@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb')
class EventStorageRocksDBTest(EventStorageBaseTest):
Expand Down
2 changes: 1 addition & 1 deletion tests/event/websocket/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_send_event_response():
b'"token_name":null,"token_symbol":null,"metadata":{"hash":"abc","spent_outputs":[],' \
b'"conflict_with":[],"voided_by":[],"received_by":[],"children":[],"twins":[],' \
b'"accumulated_weight":10.0,"score":20.0,"first_block":null,"height":100,' \
b'"validation":"validation"}},"group_id":null},"latest_event_id":10}'
b'"validation":"validation"},"aux_pow":null},"group_id":null},"latest_event_id":10}'

protocol.sendMessage.assert_called_once_with(expected_payload)

Expand Down
2 changes: 2 additions & 0 deletions tests/others/test_cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ def test_event_queue_with_rocksdb_storage(self):
self.assertIsInstance(manager._event_manager, EventManager)
self.assertIsInstance(manager._event_manager._event_storage, EventRocksDBStorage)
self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory)
self.assertTrue(manager._enable_event_queue)

def test_event_queue_with_memory_storage(self):
manager = self._build(['--x-enable-event-queue', '--memory-storage'])

self.assertIsInstance(manager._event_manager, EventManager)
self.assertIsInstance(manager._event_manager._event_storage, EventMemoryStorage)
self.assertIsInstance(manager._event_manager._event_ws_factory, EventWebsocketFactory)
self.assertTrue(manager._enable_event_queue)

def test_event_queue_with_full_verification(self):
args = ['--x-enable-event-queue', '--memory-storage', '--x-full-verification']
Expand Down