Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ def create_context(self) -> ConsensusAlgorithmContext:
def update(self, base: BaseTransaction) -> None:
assert base.storage is not None
assert base.storage.is_only_valid_allowed()
meta = base.get_metadata()
assert meta.validation.is_valid()
try:
self._unsafe_update(base)
except Exception:
meta = base.get_metadata()
meta.add_voided_by(settings.CONSENSUS_FAIL_ID)
assert base.storage is not None
base.storage.save_transaction(base, only_metadata=True)
Expand Down
2 changes: 2 additions & 0 deletions hathor/feature_activation/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_state(self, *, block: Block, feature: Feature) -> FeatureState:
offset_to_boundary = height % self._feature_settings.evaluation_interval
offset_to_previous_boundary = offset_to_boundary or self._feature_settings.evaluation_interval
previous_boundary_height = height - offset_to_previous_boundary
assert previous_boundary_height >= 0
previous_boundary_block = self._get_ancestor_at_height(block=block, height=previous_boundary_height)
previous_boundary_state = self.get_state(block=previous_boundary_block, feature=feature)

Expand Down Expand Up @@ -167,6 +168,7 @@ def _get_ancestor_iteratively(*, block: Block, ancestor_height: int) -> Block:
"""Given a block, returns its ancestor at a specific height by iterating over its ancestors. This is slow."""
# TODO: there are further optimizations to be done here, the latest common block height could be persisted in
# metadata, so we could still use the height index if the requested height is before that height.
assert ancestor_height >= 0
ancestor = block
while ancestor.get_height() > ancestor_height:
ancestor = ancestor.get_block_parent()
Expand Down
4 changes: 2 additions & 2 deletions hathor/indexes/rocksdb_height_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ def _add(self, height: int, entry: IndexEntry, *, can_reorg: bool) -> None:
raise ValueError(f'parent hash required (current height: {cur_height}, new height: {height})')
elif height == cur_height + 1:
self._db.put((self._cf, key), value)
elif cur_tip != entry.hash:
elif self.get(height) != entry.hash:
if can_reorg:
self._del_from_height(height)
self._db.put((self._cf, key), value)
else:
raise ValueError('adding would cause a re-org, use can_reorg=True to accept re-orgs')
else:
# nothing to do (there are more blocks, but the block at height currently matches the added block)
assert cur_tip == entry.hash
pass

def add_new(self, height: int, block_hash: bytes, timestamp: int) -> None:
self._add(height, IndexEntry(block_hash, timestamp), can_reorg=False)
Expand Down
67 changes: 38 additions & 29 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from hathor.transaction.exceptions import TxValidationError
from hathor.transaction.storage import TransactionStorage
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.storage.tx_allow_scope import TxAllowScope
from hathor.types import Address, VertexId
from hathor.util import EnvironmentInfo, LogDuration, Random, Reactor, calculate_min_significant_weight, not_none
from hathor.wallet import BaseWallet
Expand Down Expand Up @@ -264,6 +265,8 @@ def start(self) -> None:

# Disable get transaction lock when initializing components
self.tx_storage.disable_lock()
# Open scope for initialization.
self.tx_storage.set_allow_scope(TxAllowScope.VALID | TxAllowScope.PARTIAL | TxAllowScope.INVALID)
# Initialize manager's components.
if self._full_verification:
self.tx_storage.reset_indexes()
Expand All @@ -274,6 +277,7 @@ def start(self) -> None:
self.tx_storage.finish_full_verification()
else:
self._initialize_components_new()
self.tx_storage.set_allow_scope(TxAllowScope.VALID)
self.tx_storage.enable_lock()

# Metric starts to capture data
Expand Down Expand Up @@ -394,9 +398,8 @@ def _initialize_components_full_verification(self) -> None:

# self.start_profiler()
self.log.debug('reset all metadata')
with self.tx_storage.allow_partially_validated_context():
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()

self.log.debug('load blocks and transactions')
for tx in self.tx_storage._topological_sort_dfs():
Expand Down Expand Up @@ -431,9 +434,10 @@ def _initialize_components_full_verification(self) -> None:
try:
# TODO: deal with invalid tx
if tx.can_validate_full():
self.tx_storage.add_to_indexes(tx)
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
self.consensus_algorithm.update(tx)
self.tx_storage.add_to_indexes(tx)
with self.tx_storage.allow_only_valid_context():
self.consensus_algorithm.update(tx)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips is not None:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
Expand All @@ -442,8 +446,7 @@ def _initialize_components_full_verification(self) -> None:
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
with self.tx_storage.allow_partially_validated_context():
self.tx_storage.save_transaction(tx, only_metadata=True)
self.tx_storage.save_transaction(tx, only_metadata=True)
except (InvalidNewTransaction, TxValidationError):
self.log.error('unexpected error when initializing', tx=tx, exc_info=True)
raise
Expand Down Expand Up @@ -478,6 +481,8 @@ def _initialize_components_full_verification(self) -> None:
# we have to have a best_block by now
# assert best_block is not None

self.tx_storage.indexes._manually_initialize(self.tx_storage)

self.log.debug('done loading transactions')

# Check if all checkpoints in database are ok
Expand Down Expand Up @@ -585,7 +590,8 @@ def _verify_soft_voided_txs(self) -> None:
# that already has the soft voided transactions marked
for soft_voided_id in self.consensus_algorithm.soft_voided_tx_ids:
try:
soft_voided_tx = self.tx_storage.get_transaction(soft_voided_id)
with self.tx_storage.allow_only_valid_context():
soft_voided_tx = self.tx_storage.get_transaction(soft_voided_id)
except TransactionDoesNotExist:
# This database does not have this tx that should be soft voided
# so it's fine, we will mark it as soft voided when we get it through sync
Expand Down Expand Up @@ -654,10 +660,11 @@ def _sync_v2_resume_validations(self) -> None:
for tx_hash in self.tx_storage.indexes.deps.iter():
if not self.tx_storage.transaction_exists(tx_hash):
continue
tx = self.tx_storage.get_transaction(tx_hash)
with self.tx_storage.allow_partially_validated_context():
tx = self.tx_storage.get_transaction(tx_hash)
if tx.get_metadata().validation.is_final():
depended_final_txs.append(tx)
self.sync_v2_step_validations(depended_final_txs, quiet=False)
self.sync_v2_step_validations(depended_final_txs, quiet=True)
self.log.debug('pending validations finished')

def add_listen_address(self, addr: str) -> None:
Expand Down Expand Up @@ -980,7 +987,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
except HathorError as e:
if not fails_silently:
raise InvalidNewTransaction('consensus update failed') from e
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex)
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex, exc_info=True)
return False

assert tx.validate_full(skip_block_weight_verification=True, reject_locked_reward=reject_locked_reward)
Expand Down Expand Up @@ -1028,24 +1035,26 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: boo
for ready_tx in txs:
assert ready_tx.hash is not None
self.tx_storage.indexes.deps.remove_ready_for_validation(ready_tx.hash)
it_next_ready = self.tx_storage.indexes.deps.next_ready_for_validation(self.tx_storage)
for tx in map(self.tx_storage.get_transaction, it_next_ready):
assert tx.hash is not None
tx.update_initial_metadata()
try:
# XXX: `reject_locked_reward` might not apply, partial validation is only used on sync-v2
# TODO: deal with `reject_locked_reward` on sync-v2
assert tx.validate_full(reject_locked_reward=True)
except (AssertionError, HathorError):
# TODO
raise
else:
self.tx_storage.add_to_indexes(tx)
self.consensus_algorithm.update(tx)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
self.tx_fully_validated(tx, quiet=quiet)
with self.tx_storage.allow_partially_validated_context():
for tx in map(self.tx_storage.get_transaction,
self.tx_storage.indexes.deps.next_ready_for_validation(self.tx_storage)):
assert tx.hash is not None
tx.update_initial_metadata()
with self.tx_storage.allow_only_valid_context():
try:
# XXX: `reject_locked_reward` might not apply, partial validation is only used on sync-v2
# TODO: deal with `reject_locked_reward` on sync-v2
assert tx.validate_full(reject_locked_reward=True)
except (AssertionError, HathorError):
# TODO
raise
else:
self.tx_storage.add_to_indexes(tx)
self.consensus_algorithm.update(tx)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
self.tx_fully_validated(tx, quiet=quiet)

def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None:
""" Handle operations that need to happen once the tx becomes fully validated.
Expand Down
3 changes: 3 additions & 0 deletions hathor/p2p/sync_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ def is_errored(self) -> bool:
"""Whether the manager entered an error state"""
raise NotImplementedError

@abstractmethod
def is_sync_enabled(self) -> bool:
"""Return true if the sync is enabled."""
raise NotImplementedError

@abstractmethod
def enable_sync(self) -> None:
"""Enable sync."""
raise NotImplementedError

@abstractmethod
def disable_sync(self) -> None:
"""Disable sync."""
raise NotImplementedError
61 changes: 51 additions & 10 deletions hathor/simulator/fake_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def getPeerCertificate(self) -> X509:


class FakeConnection:
def __init__(self, manager1: 'HathorManager', manager2: 'HathorManager', *, latency: float = 0):
def __init__(self, manager1: 'HathorManager', manager2: 'HathorManager', *, latency: float = 0,
autoreconnect: bool = False):
"""
:param: latency: Latency between nodes in seconds
"""
Expand All @@ -51,20 +52,14 @@ def __init__(self, manager1: 'HathorManager', manager2: 'HathorManager', *, late
self.manager2 = manager2

self.latency = latency
self.is_connected = True

self._proto1 = manager1.connections.server_factory.buildProtocol(HostnameAddress(b'fake', 0))
self._proto2 = manager2.connections.client_factory.buildProtocol(HostnameAddress(b'fake', 0))

self.tr1 = HathorStringTransport(self._proto2.my_peer)
self.tr2 = HathorStringTransport(self._proto1.my_peer)
self.autoreconnect = autoreconnect
self.is_connected = False

self._do_buffering = True
self._buf1: deque[str] = deque()
self._buf2: deque[str] = deque()

self._proto1.makeConnection(self.tr1)
self._proto2.makeConnection(self.tr2)
self.reconnect()

@property
def proto1(self):
Expand All @@ -79,6 +74,35 @@ def disable_idle_timeout(self):
self._proto1.disable_idle_timeout()
self._proto2.disable_idle_timeout()

def is_both_synced(self) -> bool:
"""Short-hand check that can be used to make "step loops" without having to guess the number of iterations."""
from hathor.p2p.states.ready import ReadyState
conn1_aborting = self._proto1.aborting
conn2_aborting = self._proto2.aborting
if conn1_aborting or conn2_aborting:
self.log.debug('conn aborting', conn1_aborting=conn1_aborting, conn2_aborting=conn2_aborting)
return False
state1 = self._proto1.state
state2 = self._proto2.state
state1_is_ready = isinstance(state1, ReadyState)
state2_is_ready = isinstance(state2, ReadyState)
if not state1_is_ready or not state2_is_ready:
self.log.debug('peer not ready', peer1_ready=state1_is_ready, peer2_ready=state2_is_ready)
return False
assert isinstance(state1, ReadyState) # mypy can't infer this from the above
assert isinstance(state2, ReadyState) # mypy can't infer this from the above
state1_is_errored = state1.sync_manager.is_errored()
state2_is_errored = state2.sync_manager.is_errored()
if state1_is_errored or state2_is_errored:
self.log.debug('peer errored', peer1_errored=state1_is_errored, peer2_errored=state2_is_errored)
return False
state1_is_synced = state1.sync_manager.is_synced()
state2_is_synced = state2.sync_manager.is_synced()
if not state1_is_synced or not state2_is_synced:
self.log.debug('peer not synced', peer1_synced=state1_is_synced, peer2_synced=state2_is_synced)
return False
return True

def can_step(self) -> bool:
"""Short-hand check that can be used to make "step loops" without having to guess the number of iterations."""
from hathor.p2p.states.ready import ReadyState
Expand Down Expand Up @@ -155,6 +179,9 @@ def run_one_step(self, debug=False, force=False):
if debug:
self.log.debug('[2->1] delivered', line=line2)

if self.autoreconnect and self._proto1.aborting and self._proto2.aborting:
self.reconnect()

return True

def run_until_empty(self, max_steps: Optional[int] = None, debug: bool = False, force: bool = False) -> None:
Expand All @@ -178,6 +205,20 @@ def disconnect(self, reason):
self._proto2.connectionLost(reason)
self.is_connected = False

def reconnect(self) -> None:
from twisted.python.failure import Failure
if self.is_connected:
self.disconnect(Failure(Exception('forced reconnection')))
self._buf1.clear()
self._buf2.clear()
self._proto1 = self.manager1.connections.server_factory.buildProtocol(HostnameAddress(b'fake', 0))
self._proto2 = self.manager2.connections.client_factory.buildProtocol(HostnameAddress(b'fake', 0))
self.tr1 = HathorStringTransport(self._proto2.my_peer)
self.tr2 = HathorStringTransport(self._proto1.my_peer)
self._proto1.makeConnection(self.tr1)
self._proto2.makeConnection(self.tr2)
self.is_connected = True

def is_empty(self):
if self._do_buffering and (self._buf1 or self._buf2):
return False
Expand Down
33 changes: 32 additions & 1 deletion hathor/simulator/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from hathor.simulator.fake_connection import FakeConnection
from hathor.simulator.miner import AbstractMiner
from hathor.simulator.tx_generator import RandomTransactionGenerator
from hathor.wallet import BaseWallet
Expand Down Expand Up @@ -71,3 +72,33 @@ def reset(self) -> None:
def should_stop(self) -> bool:
diff = self.tx_generator.transactions_found - self.initial_counter
return diff >= self.quantity


class StopWhenTrue(Trigger):
"""Stop the simulation when a function returns true."""
def __init__(self, fn: Callable[[], bool]) -> None:
self.fn = fn

def should_stop(self) -> bool:
return self.fn()


class StopWhenSynced(Trigger):
"""Stop the simulation when both agents runnning on a connection report that they have synced."""
def __init__(self, connection: 'FakeConnection') -> None:
self.connection = connection

def should_stop(self) -> bool:
return self.connection.is_both_synced()


class All(Trigger):
"""Aggregator that returns True when all sub-triggers return True.

XXX: note that not all sub-triggers will be called, this will short-circuit, in order, if one sub-trigger returns
False, which follows the same behavior of builtins.all"""
def __init__(self, sub_triggers: list[Trigger]) -> None:
self._sub_triggers = sub_triggers

def should_stop(self) -> bool:
return all(trigger.should_stop() for trigger in self._sub_triggers)
1 change: 1 addition & 0 deletions hathor/transaction/base_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ def validate_full(self, skip_block_weight_verification: bool = False, sync_check
from hathor.transaction.transaction_metadata import ValidationState

meta = self.get_metadata()

# skip full validation when it is a checkpoint
if meta.validation.is_checkpoint():
self.set_validation(ValidationState.CHECKPOINT_FULL)
Expand Down
5 changes: 3 additions & 2 deletions hathor/transaction/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ def has_basic_block_parent(self) -> bool:
if not self.storage.transaction_exists(parent_block_hash):
return False
metadata = self.storage.get_metadata(parent_block_hash)
assert metadata is not None
if metadata is None:
return False
return metadata.validation.is_at_least_basic()

def verify_basic(self, skip_block_weight_verification: bool = False) -> None:
Expand All @@ -314,7 +315,7 @@ def verify_checkpoint(self, checkpoints: list[Checkpoint]) -> None:
raise CheckpointError(f'Invalid new block {self.hash_hex}: checkpoint hash does not match')
else:
# TODO: check whether self is a parent of any checkpoint-valid block, this is left for a future PR
raise NotImplementedError
pass

def verify_weight(self) -> None:
"""Validate minimum block difficulty."""
Expand Down
1 change: 1 addition & 0 deletions hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, rocksdb_storage: RocksDBStorage, indexes: Optional[IndexesMan
self._cf_attr = rocksdb_storage.get_or_create_column_family(_CF_NAME_ATTR)
self._cf_migrations = rocksdb_storage.get_or_create_column_family(_CF_NAME_MIGRATIONS)

self._rocksdb_storage = rocksdb_storage
self._db = rocksdb_storage.get_db()
super().__init__(indexes=indexes)

Expand Down
Loading