Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 55 additions & 97 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ def start(self) -> None:
# Initialize manager's components.
if self._full_verification:
self.tx_storage.reset_indexes()
self._initialize_components()
# Before calling self._initialize_components() I start 'full verification' mode and after that I need to
# finish it. It's just to know if the full node has stopped a full initialization in the middle
self._initialize_components_full_verification()
# Before calling self._initialize_components_full_verification() I start 'full verification' mode and
# after that I need to finish it. It's just to know if the full node has stopped a full initialization
# in the middle.
self.tx_storage.finish_full_verification()
else:
self._initialize_components_new()
Expand Down Expand Up @@ -358,13 +359,14 @@ def stop_profiler(self, save_to: Optional[str] = None) -> None:
if save_to:
self.profiler.dump_stats(save_to)

def _initialize_components(self) -> None:
def _initialize_components_full_verification(self) -> None:
"""You are not supposed to run this method manually. You should run `doStart()` to initialize the
manager.

This method runs through all transactions, verifying them and updating our wallet.
"""
assert not self._enable_event_queue, 'this method cannot be used if the events feature is enabled.'
assert self._full_verification

self.log.info('initialize')
if self.wallet:
Expand All @@ -382,46 +384,22 @@ def _initialize_components(self) -> None:
self.tx_storage.pre_init()
assert self.tx_storage.indexes is not None

# After introducing soft voided transactions we need to guarantee the full node is not using
# a database that already has the soft voided transaction before marking them in the metadata
# Any new sync from the beginning should work fine or starting with the latest snapshot
# that already has the soft voided transactions marked
for soft_voided_id in self.consensus_algorithm.soft_voided_tx_ids:
try:
soft_voided_tx = self.tx_storage.get_transaction(soft_voided_id)
except TransactionDoesNotExist:
# This database does not have this tx that should be soft voided
# so it's fine, we will mark it as soft voided when we get it through sync
pass
else:
soft_voided_meta = soft_voided_tx.get_metadata()
voided_set = soft_voided_meta.voided_by or set()
# If the tx is not marked as soft voided, then we can't continue the initialization
if settings.SOFT_VOIDED_ID not in voided_set:
self.log.error(
'Error initializing node. Your database is not compatible with the current version of the'
' full node. You must use the latest available snapshot or sync from the beginning.'
)
sys.exit(-1)

assert {soft_voided_id, settings.SOFT_VOIDED_ID}.issubset(voided_set)
self._verify_soft_voided_txs()

# Checkpoints as {height: hash}
checkpoint_heights = {}
for cp in self.checkpoints:
checkpoint_heights[cp.height] = cp.hash

# self.start_profiler()
if self._full_verification:
self.log.debug('reset all metadata')
with self.tx_storage.allow_partially_validated_context():
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()
self.log.debug('reset all metadata')
with self.tx_storage.allow_partially_validated_context():
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()

self.log.debug('load blocks and transactions')
for tx in self.tx_storage._topological_sort_dfs():
if self._full_verification:
tx.update_initial_metadata()
tx.update_initial_metadata()

assert tx.hash is not None

Expand Down Expand Up @@ -450,34 +428,21 @@ def _initialize_components(self) -> None:
skip_block_weight_verification = False

try:
if self._full_verification:
# TODO: deal with invalid tx
if tx.can_validate_full():
self.tx_storage.add_to_indexes(tx)
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
self.consensus_algorithm.update(tx)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips is not None:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
if self.tx_storage.indexes.deps is not None:
self.sync_v2_step_validations([tx])
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
with self.tx_storage.allow_partially_validated_context():
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
# TODO: deal with invalid tx
if not tx_meta.validation.is_final():
if not tx_meta.validation.is_checkpoint():
assert tx_meta.validation.is_at_least_basic(), f'invalid: {tx.hash_hex}'
elif tx.is_transaction and tx_meta.first_block is None and not tx_meta.voided_by:
assert self.tx_storage.indexes is not None
if self.tx_storage.indexes.mempool_tips:
self.tx_storage.indexes.mempool_tips.update(tx)
# TODO: deal with invalid tx
if tx.can_validate_full():
self.tx_storage.add_to_indexes(tx)
if tx.is_transaction and tx_meta.voided_by:
self.tx_storage.del_from_indexes(tx)
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
self.consensus_algorithm.update(tx)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips is not None:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
if self.tx_storage.indexes.deps is not None:
self.sync_v2_step_validations([tx])
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
with self.tx_storage.allow_partially_validated_context():
self.tx_storage.save_transaction(tx, only_metadata=True)
except (InvalidNewTransaction, TxValidationError):
self.log.error('unexpected error when initializing', tx=tx, exc_info=True)
raise
Expand Down Expand Up @@ -524,18 +489,8 @@ def _initialize_components(self) -> None:
sys.exit()

# restart all validations possible
if self.tx_storage.indexes.deps and self.tx_storage.indexes.deps.has_needed_tx():
self.log.debug('run pending validations')
depended_final_txs: list[BaseTransaction] = []
for tx_hash in self.tx_storage.indexes.deps.iter():
if not self.tx_storage.transaction_exists(tx_hash):
continue
tx = self.tx_storage.get_transaction(tx_hash)
if tx.get_metadata().validation.is_final():
depended_final_txs.append(tx)
if self.tx_storage.indexes.deps is not None:
self.sync_v2_step_validations(depended_final_txs)
self.log.debug('pending validations finished')
if self.tx_storage.indexes.deps:
self._sync_v2_resume_validations()

best_height = self.tx_storage.get_height_best_block()
if best_height != h:
Expand Down Expand Up @@ -577,30 +532,7 @@ def _initialize_components_new(self) -> None:
self.log.warn('The last started time is to the future of the current time',
started_at=started_at, last_started_at=last_started_at)

# TODO: this could be either refactored into a migration or at least into it's own method
# After introducing soft voided transactions we need to guarantee the full node is not using
# a database that already has the soft voided transaction before marking them in the metadata
# Any new sync from the beginning should work fine or starting with the latest snapshot
# that already has the soft voided transactions marked
for soft_voided_id in self.consensus_algorithm.soft_voided_tx_ids:
try:
soft_voided_tx = self.tx_storage.get_transaction(soft_voided_id)
except TransactionDoesNotExist:
# This database does not have this tx that should be soft voided
# so it's fine, we will mark it as soft voided when we get it through sync
pass
else:
soft_voided_meta = soft_voided_tx.get_metadata()
voided_set = soft_voided_meta.voided_by or set()
# If the tx is not marked as soft voided, then we can't continue the initialization
if settings.SOFT_VOIDED_ID not in voided_set:
self.log.error(
'Error initializing node. Your database is not compatible with the current version of the'
' full node. You must use the latest available snapshot or sync from the beginning.'
)
sys.exit(-1)

assert {soft_voided_id, settings.SOFT_VOIDED_ID}.issubset(voided_set)
self._verify_soft_voided_txs()

# TODO: move support for full-verification here, currently we rely on the original _initialize_components
# method for full-verification to work, if we implement it here we'll reduce a lot of duplicate and
Expand Down Expand Up @@ -639,6 +571,32 @@ def _initialize_components_new(self) -> None:
self.log.info('ready', vertex_count=vertex_count,
total_load_time=total_load_time, **environment_info)

def _verify_soft_voided_txs(self) -> None:
# TODO: this could be either refactored into a migration or at least into it's own method
# After introducing soft voided transactions we need to guarantee the full node is not using
# a database that already has the soft voided transaction before marking them in the metadata
# Any new sync from the beginning should work fine or starting with the latest snapshot
# that already has the soft voided transactions marked
for soft_voided_id in self.consensus_algorithm.soft_voided_tx_ids:
try:
soft_voided_tx = self.tx_storage.get_transaction(soft_voided_id)
except TransactionDoesNotExist:
# This database does not have this tx that should be soft voided
# so it's fine, we will mark it as soft voided when we get it through sync
pass
else:
soft_voided_meta = soft_voided_tx.get_metadata()
voided_set = soft_voided_meta.voided_by or set()
# If the tx is not marked as soft voided, then we can't continue the initialization
if settings.SOFT_VOIDED_ID not in voided_set:
self.log.error(
'Error initializing node. Your database is not compatible with the current version of the'
' full node. You must use the latest available snapshot or sync from the beginning.'
)
sys.exit(-1)

assert {soft_voided_id, settings.SOFT_VOIDED_ID}.issubset(voided_set)

def _verify_checkpoints(self) -> None:
""" Method to verify if all checkpoints that exist in the database have the correct hash and are winners.

Expand Down