Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from hathor.event import EventManager
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.bit_signaling_service import BitSignalingService
from hathor.feature_activation.feature import Feature
from hathor.feature_activation.feature_service import FeatureService
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(self) -> None:
self._feature_service: Optional[FeatureService] = None
self._bit_signaling_service: Optional[BitSignalingService] = None

self._consensus: Optional[ConsensusAlgorithm] = None
self._daa: Optional[DifficultyAdjustmentAlgorithm] = None
self._cpu_mining_service: Optional[CpuMiningService] = None

Expand Down Expand Up @@ -149,6 +151,8 @@ def __init__(self) -> None:

self._soft_voided_tx_ids: Optional[set[bytes]] = None

self._execution_manager: ExecutionManager | None = None

def build(self) -> BuildArtifacts:
if self.artifacts is not None:
raise ValueError('cannot call build twice')
Expand All @@ -161,9 +165,7 @@ def build(self) -> BuildArtifacts:
pubsub = self._get_or_create_pubsub()

peer_id = self._get_peer_id()

soft_voided_tx_ids = self._get_soft_voided_tx_ids()
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)
consensus_algorithm = self._get_or_create_consensus()

p2p_manager = self._get_p2p_manager()

Expand Down Expand Up @@ -305,6 +307,17 @@ def _get_peer_id(self) -> PeerId:
return self._peer_id
raise ValueError('peer_id not set')

def _get_or_create_execution_manager(self) -> ExecutionManager:
if self._execution_manager is None:
tx_storage = self._get_or_create_tx_storage()
event_manager = self._get_or_create_event_manager()
self._execution_manager = ExecutionManager(
tx_storage=tx_storage,
event_manager=event_manager,
)

return self._execution_manager

def _get_or_create_pubsub(self) -> PubSubManager:
if self._pubsub is None:
self._pubsub = PubSubManager(self._get_reactor())
Expand Down Expand Up @@ -445,10 +458,12 @@ def _get_or_create_event_manager(self) -> EventManager:
def _get_or_create_feature_service(self) -> FeatureService:
"""Return the FeatureService instance set on this builder, or a new one if not set."""
if self._feature_service is None:
reactor = self._get_reactor()
settings = self._get_or_create_settings()
tx_storage = self._get_or_create_tx_storage()
self._feature_service = FeatureService(
feature_settings=settings.FEATURE_ACTIVATION,
reactor=reactor,
settings=settings,
tx_storage=tx_storage
)

Expand All @@ -469,6 +484,21 @@ def _get_or_create_bit_signaling_service(self) -> BitSignalingService:

return self._bit_signaling_service

def _get_or_create_consensus(self) -> ConsensusAlgorithm:
if self._consensus is None:
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
pubsub = self._get_or_create_pubsub()
execution_manager = self._get_or_create_execution_manager()
feature_service = self._get_or_create_feature_service()
self._consensus = ConsensusAlgorithm(
soft_voided_tx_ids=soft_voided_tx_ids,
pubsub=pubsub,
execution_manager=execution_manager,
feature_service=feature_service,
)

return self._consensus

def _get_or_create_verification_service(self) -> VerificationService:
if self._verification_service is None:
verifiers = self._get_or_create_vertex_verifiers()
Expand Down
16 changes: 14 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from hathor.daa import DifficultyAdjustmentAlgorithm
from hathor.event import EventManager
from hathor.exception import BuilderError
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.bit_signaling_service import BitSignalingService
from hathor.feature_activation.feature_service import FeatureService
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
Expand Down Expand Up @@ -192,17 +193,28 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
full_verification = True

soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)
execution_manager = ExecutionManager(
tx_storage=tx_storage,
event_manager=event_manager,
)

if self._args.x_enable_event_queue:
self.log.info('--x-enable-event-queue flag provided. '
'The events detected by the full node will be stored and can be retrieved by clients')

self.feature_service = FeatureService(
feature_settings=settings.FEATURE_ACTIVATION,
reactor=reactor,
settings=settings,
tx_storage=tx_storage
)

consensus_algorithm = ConsensusAlgorithm(
soft_voided_tx_ids,
pubsub=pubsub,
execution_manager=execution_manager,
feature_service=self.feature_service,
)

bit_signaling_service = BitSignalingService(
feature_settings=settings.FEATURE_ACTIVATION,
feature_service=self.feature_service,
Expand Down
25 changes: 22 additions & 3 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
from hathor.consensus.context import ConsensusAlgorithmContext
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
from hathor.exception import ReorgTooLargeError
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.feature_service import FeatureService
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
Expand Down Expand Up @@ -55,13 +58,22 @@ class ConsensusAlgorithm:
b0 will not be propagated to the voided_by of b1, b2, and b3.
"""

def __init__(self, soft_voided_tx_ids: set[bytes], pubsub: PubSubManager) -> None:
def __init__(
self,
soft_voided_tx_ids: set[bytes],
pubsub: PubSubManager,
*,
execution_manager: ExecutionManager,
feature_service: FeatureService,
) -> None:
self._settings = get_settings()
self.log = logger.new()
self._pubsub = pubsub
self._feature_service = feature_service
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
self.block_algorithm_factory = BlockConsensusAlgorithmFactory()
self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory()
self._execution_manager = execution_manager

def create_context(self) -> ConsensusAlgorithmContext:
"""Handy method to create a context that can be used to access block and transaction algorithms."""
Expand All @@ -75,11 +87,11 @@ def update(self, base: BaseTransaction) -> None:
assert meta.validation.is_valid()
try:
self._unsafe_update(base)
except Exception:
except BaseException:
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
assert base.storage is not None
base.storage.save_transaction(base, only_metadata=True)
raise
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')

def _unsafe_update(self, base: BaseTransaction) -> None:
"""Run a consensus update with its own context, indexes will be updated accordingly."""
Expand Down Expand Up @@ -130,6 +142,13 @@ def _unsafe_update(self, base: BaseTransaction) -> None:
reorg_size = old_best_block.get_height() - context.reorg_common_block.get_height()
assert old_best_block != new_best_block
assert reorg_size > 0

if not self._feature_service.is_reorg_valid(context.reorg_common_block):
# Raise an exception forcing the full node to exit if the reorg is too large for Feature Activation.
raise ReorgTooLargeError(
'Reorg is invalid. Time difference between common block and now is too large.'
)

context.pubsub.publish(HathorEvents.REORG_STARTED, old_best_height=best_height,
old_best_block=old_best_block, new_best_height=new_best_height,
new_best_block=new_best_block, common_block=context.reorg_common_block,
Expand Down
13 changes: 11 additions & 2 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _subscribe_events(self) -> None:
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_hathor_event)

def load_started(self):
def load_started(self) -> None:
if not self._is_running:
return

Expand All @@ -142,7 +142,7 @@ def load_started(self):
)
self._event_storage.save_node_state(NodeState.LOAD)

def load_finished(self):
def load_finished(self) -> None:
if not self._is_running:
return

Expand All @@ -152,6 +152,15 @@ def load_finished(self):
)
self._event_storage.save_node_state(NodeState.SYNC)

def full_node_crashed(self) -> None:
if not self._is_running:
return

self._handle_event(
event_type=EventType.FULL_NODE_CRASHED,
event_args=EventArguments(),
)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
"""Handles a PubSub 'HathorEvents' event."""
event_type = EventType.from_hathor_event(hathor_event)
Expand Down
2 changes: 2 additions & 0 deletions hathor/event/model/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class EventType(Enum):
REORG_STARTED = 'REORG_STARTED'
REORG_FINISHED = 'REORG_FINISHED'
VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED'
FULL_NODE_CRASHED = 'FULL_NODE_CRASHED'

@classmethod
def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType':
Expand Down Expand Up @@ -53,4 +54,5 @@ def data_type(self) -> type[BaseEventData]:
EventType.REORG_STARTED: ReorgData,
EventType.REORG_FINISHED: EmptyData,
EventType.VERTEX_METADATA_CHANGED: TxData,
EventType.FULL_NODE_CRASHED: EmptyData,
}
4 changes: 4 additions & 0 deletions hathor/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class InitializationError(HathorError):
"""


class ReorgTooLargeError(HathorError):
"""Raised when a reorg is too large for the full node to recover."""


class DoubleSpendingError(InvalidNewTransaction):
"""Raised when a new received tx/block is not valid because of a double spending.
"""
Expand Down
51 changes: 51 additions & 0 deletions hathor/execution_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from typing import NoReturn

from structlog import get_logger

from hathor.event import EventManager
from hathor.transaction.storage import TransactionStorage

logger = get_logger()


class ExecutionManager:
"""Class to manage actions related to full node execution."""
__slots__ = ('_log', '_tx_storage', '_event_manager')

def __init__(self, *, tx_storage: TransactionStorage, event_manager: EventManager) -> None:
self._log = logger.new()
self._tx_storage = tx_storage
self._event_manager = event_manager

def crash_and_exit(self, *, reason: str) -> NoReturn:
"""
Calling this function is a very extreme thing to do, so be careful. It should only be called when a
critical, unrecoverable failure happens. It crashes and exits the full node, rendering the database
corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch
or a snapshot) will be required.
"""
self._tx_storage.full_node_crashed()
self._event_manager.full_node_crashed()
self._log.critical(
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
reason=reason,
exc_info=True
)
# We use os._exit() instead of sys.exit() or any other approaches because this is the only one Twisted
# doesn't catch.
os._exit(-1)
Loading