Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ def _get_or_create_consensus(self) -> ConsensusAlgorithm:
nc_log_storage=self._get_or_create_nc_log_storage(),
nc_calls_sorter=nc_calls_sorter,
feature_service=self._get_or_create_feature_service(),
tx_storage=self._get_or_create_tx_storage(),
)

return self._consensus
Expand Down
124 changes: 67 additions & 57 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
from __future__ import annotations

from collections import defaultdict
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING, Callable, assert_never

from structlog import get_logger

from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
from hathor.consensus.context import ConsensusAlgorithmContext
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
from hathor.execution_manager import non_critical_code
from hathor.feature_activation.utils import Features
from hathor.feature_activation.feature import Feature
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction, Transaction
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.exceptions import RewardLocked
from hathor.util import not_none

Expand Down Expand Up @@ -78,6 +78,7 @@ def __init__(
pubsub: PubSubManager,
*,
settings: HathorSettings,
tx_storage: TransactionStorage,
runner_factory: RunnerFactory,
nc_calls_sorter: NCSorterCallable,
nc_log_storage: NCLogStorage,
Expand All @@ -87,6 +88,7 @@ def __init__(
self._settings = settings
self.log = logger.new()
self._pubsub = pubsub
self.tx_storage = tx_storage
self.nc_storage_factory = nc_storage_factory
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
self.block_algorithm_factory = BlockConsensusAlgorithmFactory(
Expand All @@ -109,8 +111,7 @@ def unsafe_update(self, base: BaseTransaction) -> None:
if this method throws any exception.
"""
from hathor.transaction import Block, Transaction
assert base.storage is not None
assert base.storage.is_only_valid_allowed()
assert self.tx_storage.is_only_valid_allowed()
meta = base.get_metadata()
assert meta.validation.is_valid()

Expand All @@ -122,12 +123,10 @@ def unsafe_update(self, base: BaseTransaction) -> None:
# this context instance will live only while this update is running
context = self.create_context()

assert base.storage is not None
storage = base.storage
best_height, best_tip = storage.indexes.height.get_height_tip()
best_height, best_tip = self.tx_storage.indexes.height.get_height_tip()

# This has to be called before the removal of vertices, otherwise this call may fail.
old_best_block = base.storage.get_transaction(best_tip)
old_best_block = self.tx_storage.get_block(best_tip)

if isinstance(base, Transaction):
context.transaction_algorithm.update_consensus(base)
Expand All @@ -139,10 +138,10 @@ def unsafe_update(self, base: BaseTransaction) -> None:
# signal a mempool tips index update for all affected transactions,
# because that index is used on _compute_vertices_that_became_invalid below.
for tx_affected in _sorted_affected_txs(context.txs_affected):
storage.indexes.mempool_tips.update(tx_affected)
self.tx_storage.indexes.mempool_tips.update(tx_affected)

txs_to_remove: list[BaseTransaction] = []
new_best_height, new_best_tip = storage.indexes.height.get_height_tip()
new_best_height, new_best_tip = self.tx_storage.indexes.height.get_height_tip()

if context.reorg_info is not None:
if new_best_height < best_height:
Expand All @@ -152,20 +151,22 @@ def unsafe_update(self, base: BaseTransaction) -> None:
)

# XXX: this method will mark as INVALID all transactions in the mempool that became invalid after the reorg
txs_to_remove.extend(self._compute_vertices_that_became_invalid(storage, new_best_height))
txs_to_remove.extend(
self._compute_vertices_that_became_invalid(new_best_block=context.reorg_info.new_best_block)
)

if txs_to_remove:
self.log.warn('some transactions on the mempool became invalid and will be removed',
count=len(txs_to_remove))
# XXX: because transactions in `txs_to_remove` are marked as invalid, we need this context to be
# able to remove them
with storage.allow_invalid_context():
self._remove_transactions(txs_to_remove, storage, context)
with self.tx_storage.allow_invalid_context():
self._remove_transactions(txs_to_remove, context)

# emit the reorg started event if needed
if context.reorg_info is not None:
assert isinstance(old_best_block, Block)
new_best_block = base.storage.get_transaction(new_best_tip)
new_best_block = self.tx_storage.get_transaction(new_best_tip)
reorg_size = old_best_block.get_height() - context.reorg_info.common_block.get_height()
# TODO: After we remove block ties, should the assert below be true?
# assert old_best_block.get_metadata().voided_by
Expand All @@ -190,10 +191,9 @@ def unsafe_update(self, base: BaseTransaction) -> None:

# finally signal an index update for all affected transactions
for tx_affected in _sorted_affected_txs(context.txs_affected):
assert tx_affected.storage is not None
tx_affected.storage.indexes.update_critical_indexes(tx_affected)
self.tx_storage.indexes.update_critical_indexes(tx_affected)
with non_critical_code(self.log):
tx_affected.storage.indexes.update_non_critical_indexes(tx_affected)
self.tx_storage.indexes.update_non_critical_indexes(tx_affected)
context.pubsub.publish(HathorEvents.CONSENSUS_TX_UPDATE, tx=tx_affected)

# signal all transactions of which the execution succeeded
Expand Down Expand Up @@ -242,8 +242,7 @@ def _filter_out_soft_voided_entries(self, tx: BaseTransaction, voided_by: set[by
continue
if h in self.soft_voided_tx_ids:
continue
assert tx.storage is not None
tx3 = tx.storage.get_transaction(h)
tx3 = self.tx_storage.get_transaction(h)
tx3_meta = tx3.get_metadata()
tx3_voided_by: set[bytes] = tx3_meta.voided_by or set()
if not (self.soft_voided_tx_ids & tx3_voided_by):
Expand All @@ -267,21 +266,15 @@ def _filter_out_nc_fail_entries(self, tx: BaseTransaction, voided_by: set[bytes]
continue
if h == tx.hash:
continue
assert tx.storage is not None
tx2 = tx.storage.get_transaction(h)
tx2 = self.tx_storage.get_transaction(h)
tx2_meta = tx2.get_metadata()
tx2_voided_by: set[bytes] = tx2_meta.voided_by or set()
if NC_EXECUTION_FAIL_ID in tx2_voided_by:
ret.discard(h)
assert NC_EXECUTION_FAIL_ID not in ret
return ret

def _remove_transactions(
self,
txs: list[BaseTransaction],
storage: TransactionStorage,
context: ConsensusAlgorithmContext,
) -> None:
def _remove_transactions(self, txs: list[BaseTransaction], context: ConsensusAlgorithmContext) -> None:
"""Will remove all the transactions on the list from the database.

Special notes:
Expand Down Expand Up @@ -319,38 +312,32 @@ def _remove_transactions(
spent_tx_meta.spent_outputs[tx_input.index].remove(tx.hash)
context.save(spent_tx)
for parent_hash, children_to_remove in parents_to_update.items():
parent_tx = storage.get_transaction(parent_hash)
parent_tx = self.tx_storage.get_transaction(parent_hash)
for child in children_to_remove:
storage.vertex_children.remove_child(parent_tx, child)
self.tx_storage.vertex_children.remove_child(parent_tx, child)
context.save(parent_tx)
for tx in txs:
self.log.debug('remove transaction', tx=tx.hash_hex)
storage.remove_transaction(tx)
self.tx_storage.remove_transaction(tx)

def _compute_vertices_that_became_invalid(
self,
storage: TransactionStorage,
new_best_height: int,
) -> list[BaseTransaction]:
def _compute_vertices_that_became_invalid(self, *, new_best_block: Block) -> list[BaseTransaction]:
"""This method will look for transactions in the mempool that have become invalid after a reorg."""
from hathor.transaction.storage.traversal import BFSTimestampWalk
from hathor.transaction.validation_state import ValidationState

mempool_tips = list(storage.indexes.mempool_tips.iter(storage))
mempool_tips = list(self.tx_storage.indexes.mempool_tips.iter(self.tx_storage))
if not mempool_tips:
# Mempool is empty, nothing to remove.
return []

mempool_rules: tuple[Callable[[Transaction], bool], ...] = (
lambda tx: self._reward_lock_mempool_rule(tx, new_best_height),
lambda tx: self._unknown_contract_mempool_rule(tx),
lambda tx: self._nano_activation_rule(storage, tx),
lambda tx: self._fee_tokens_activation_rule(storage, tx),
self._checkdatasig_count_rule,
lambda tx: self._reward_lock_mempool_rule(tx, new_best_block.get_height()),
lambda tx: self._feature_activation_rules(tx, new_best_block),
self._unknown_contract_mempool_rule,
)

find_invalid_bfs = BFSTimestampWalk(
storage, is_dag_funds=True, is_dag_verifications=True, is_left_to_right=False
self.tx_storage, is_dag_funds=True, is_dag_verifications=True, is_left_to_right=False
)

invalid_txs: set[BaseTransaction] = set()
Expand All @@ -373,7 +360,7 @@ def _compute_vertices_that_became_invalid(
# From the invalid txs, mark all vertices to the right as invalid. This includes both txs and blocks.
to_remove: list[BaseTransaction] = []
find_to_remove_bfs = BFSTimestampWalk(
storage, is_dag_funds=True, is_dag_verifications=True, is_left_to_right=True
self.tx_storage, is_dag_funds=True, is_dag_verifications=True, is_left_to_right=True
)
for vertex in find_to_remove_bfs.run(invalid_txs, skip_root=False):
vertex.set_validation(ValidationState.INVALID)
Expand Down Expand Up @@ -416,15 +403,40 @@ def _unknown_contract_mempool_rule(self, tx: Transaction) -> bool:
return False
return True

def _nano_activation_rule(self, storage: TransactionStorage, tx: Transaction) -> bool:
def _feature_activation_rules(self, tx: Transaction, new_best_block: Block) -> bool:
"""Check whether a tx became invalid because of some feature state of the new best block."""
features = self.feature_service.get_feature_states(vertex=new_best_block)

for feature, feature_state in features.items():
is_active = feature_state.is_active()
match feature:
case Feature.NANO_CONTRACTS:
if not self._nano_activation_rule(tx, is_active):
return False
case Feature.FEE_TOKENS:
if not self._fee_tokens_activation_rule(tx, is_active):
return False
case Feature.COUNT_CHECKDATASIG_OP:
if not self._checkdatasig_count_rule(tx):
return False
case (
Feature.INCREASE_MAX_MERKLE_PATH_LENGTH
| Feature.NOP_FEATURE_1
| Feature.NOP_FEATURE_2
| Feature.NOP_FEATURE_3
):
# These features do not affect transactions.
pass
case _:
assert_never(feature)

return True

def _nano_activation_rule(self, tx: Transaction, is_active: bool) -> bool:
"""Check whether a tx became invalid because the reorg changed the nano feature activation state."""
from hathor.nanocontracts import OnChainBlueprint

best_block = storage.get_best_block()
features = Features.from_vertex(
settings=self._settings, vertex=best_block, feature_service=self.feature_service
)
if features.nanocontracts:
if is_active:
# When nano is active, this rule has no effect.
return True

Expand All @@ -437,18 +449,14 @@ def _nano_activation_rule(self, storage: TransactionStorage, tx: Transaction) ->

return True

def _fee_tokens_activation_rule(self, storage: TransactionStorage, tx: Transaction) -> bool:
def _fee_tokens_activation_rule(self, tx: Transaction, is_active: bool) -> bool:
"""
Check whether a tx became invalid because the reorg changed the fee-based tokens feature activation state.
"""
from hathor.transaction.token_creation_tx import TokenCreationTransaction
from hathor.transaction.token_info import TokenVersion

best_block = storage.get_best_block()
features = Features.from_vertex(
settings=self._settings, vertex=best_block, feature_service=self.feature_service
)
if features.fee_tokens:
if is_active:
# When fee-based tokens feature is active, this rule has no effect.
return True

Expand All @@ -462,9 +470,11 @@ def _fee_tokens_activation_rule(self, storage: TransactionStorage, tx: Transacti
return True

def _checkdatasig_count_rule(self, tx: Transaction) -> bool:
"""Check whether a tx became invalid because the reorg changed the checkdatasig feature activation state."""
"""Check whether a tx became invalid because of the count checkdatasig feature."""
from hathor.verification.vertex_verifier import VertexVerifier

# We check all txs regardless of the feature state, because this rule
# already prohibited mempool txs before the block feature activation.
# Any exception in the sigops verification will be considered
# a fail and the tx will be removed from the mempool.
try:
Expand Down
1 change: 1 addition & 0 deletions hathor_cli/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
nc_calls_sorter=nc_calls_sorter,
feature_service=self.feature_service,
nc_exec_fail_trace=self._args.nc_exec_fail_trace,
tx_storage=tx_storage,
)

if self._args.x_enable_event_queue or self._args.enable_event_queue:
Expand Down
Loading