Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def create_context(self) -> ConsensusAlgorithmContext:

@cpu.profiler(key=lambda self, base: 'consensus!{}'.format(base.hash.hex()))
def update(self, base: BaseTransaction) -> None:
assert base.storage is not None
assert base.storage.is_only_valid_allowed()
try:
self._unsafe_update(base)
except Exception:
Expand Down Expand Up @@ -107,11 +109,16 @@ def _unsafe_update(self, base: BaseTransaction) -> None:
if new_best_height < best_height:
self.log.warn('height decreased, re-checking mempool', prev_height=best_height, new_height=new_best_height,
prev_block_tip=best_tip.hex(), new_block_tip=new_best_tip.hex())
to_remove = storage.get_transactions_that_became_invalid()
# XXX: this method will mark as INVALID all transactions in the mempool that became invalid because of a
# reward lock
to_remove = storage.compute_transactions_that_became_invalid()
if to_remove:
self.log.warn('some transactions on the mempool became invalid and will be removed',
count=len(to_remove))
storage.remove_transactions(to_remove)
# XXX: because transactions in `to_remove` are marked as invalid, we need this context to be able to
# remove them
with storage.allow_invalid_context():
storage.remove_transactions(to_remove)
for tx_removed in to_remove:
context.pubsub.publish(HathorEvents.CONSENSUS_TX_REMOVED, tx_hash=tx_removed.hash)

Expand Down
9 changes: 6 additions & 3 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,9 @@ def _initialize_components(self) -> None:
# self.start_profiler()
if self._full_verification:
self.log.debug('reset all metadata')
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()
with self.tx_storage.allow_partially_validated_context():
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()

self.log.debug('load blocks and transactions')
for tx in self.tx_storage._topological_sort_dfs():
Expand Down Expand Up @@ -459,9 +460,11 @@ def _initialize_components(self) -> None:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
if self.tx_storage.indexes.deps is not None:
self.sync_v2_step_validations([tx])
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
self.tx_storage.save_transaction(tx, only_metadata=True)
with self.tx_storage.allow_partially_validated_context():
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
# TODO: deal with invalid tx
if not tx_meta.validation.is_final():
Expand Down
47 changes: 20 additions & 27 deletions hathor/transaction/base_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
TxValidationError,
WeightError,
)
from hathor.transaction.transaction_metadata import TransactionMetadata
from hathor.transaction.transaction_metadata import TransactionMetadata, ValidationState
from hathor.transaction.util import VerboseCallback, int_to_bytes, unpack, unpack_len
from hathor.util import classproperty

Expand Down Expand Up @@ -482,34 +482,34 @@ def can_validate_full(self) -> bool:
return True
return all_exist and all_valid

def set_validation(self, validation: ValidationState) -> None:
""" This method will set the internal validation state AND the appropriate voided_by marker.

NOTE: THIS METHOD WILL NOT SAVE THE TRANSACTION
"""
meta = self.get_metadata()
meta.validation = validation
if validation.is_fully_connected():
self._unmark_partially_validated()
else:
self._mark_partially_validated()

def validate_checkpoint(self, checkpoints: List[Checkpoint]) -> bool:
""" Run checkpoint validations and update the validation state.

If no exception is raised, the ValidationState will end up as `CHECKPOINT` and return `True`.
"""
from hathor.transaction.transaction_metadata import ValidationState

meta = self.get_metadata()

self.verify_checkpoint(checkpoints)

meta.validation = ValidationState.CHECKPOINT
self.mark_partially_validated()
self.set_validation(ValidationState.CHECKPOINT)
return True

def validate_basic(self, skip_block_weight_verification: bool = False) -> bool:
""" Run basic validations (all that are possible without dependencies) and update the validation state.

If no exception is raised, the ValidationState will end up as `BASIC` and return `True`.
"""
from hathor.transaction.transaction_metadata import ValidationState

meta = self.get_metadata()

self.verify_basic(skip_block_weight_verification=skip_block_weight_verification)

meta.validation = ValidationState.BASIC
self.mark_partially_validated()
self.set_validation(ValidationState.BASIC)
return True

def validate_full(self, skip_block_weight_verification: bool = False, sync_checkpoints: bool = False,
Expand All @@ -523,9 +523,7 @@ def validate_full(self, skip_block_weight_verification: bool = False, sync_check
meta = self.get_metadata()
# skip full validation when it is a checkpoint
if meta.validation.is_checkpoint():
meta.validation = ValidationState.CHECKPOINT_FULL
# at last, remove the partially validated mark
self.unmark_partially_validated()
self.set_validation(ValidationState.CHECKPOINT_FULL)
return True

# XXX: in some cases it might be possible that this transaction is verified by a checkpoint but we went
Expand All @@ -536,16 +534,11 @@ def validate_full(self, skip_block_weight_verification: bool = False, sync_check
self.verify_basic(skip_block_weight_verification=skip_block_weight_verification)

self.verify(reject_locked_reward=reject_locked_reward)
if sync_checkpoints:
meta.validation = ValidationState.CHECKPOINT_FULL
else:
meta.validation = ValidationState.FULL

# at last, remove the partially validated mark
self.unmark_partially_validated()
validation = ValidationState.CHECKPOINT_FULL if sync_checkpoints else ValidationState.FULL
self.set_validation(validation)
return True

def mark_partially_validated(self) -> None:
def _mark_partially_validated(self) -> None:
""" This function is used to add the partially-validated mark from the voided-by metadata.

It is idempotent: calling it multiple time has the same effect as calling it once. But it must only be called
Expand All @@ -555,7 +548,7 @@ def mark_partially_validated(self) -> None:
assert not tx_meta.validation.is_fully_connected()
tx_meta.add_voided_by(settings.PARTIALLY_VALIDATED_ID)

def unmark_partially_validated(self) -> None:
def _unmark_partially_validated(self) -> None:
""" This function is used to remove the partially-validated mark from the voided-by metadata.

It is idempotent: calling it multiple time has the same effect as calling it once. But it must only be called
Expand Down
8 changes: 5 additions & 3 deletions hathor/transaction/storage/cache_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from collections import OrderedDict
from typing import Any, Optional, Set
from typing import Any, Iterator, Optional, Set

from twisted.internet import threads

Expand Down Expand Up @@ -208,9 +208,11 @@ def _get_transaction(self, hash_bytes: bytes) -> BaseTransaction:
assert tx is not None
return tx

def get_all_transactions(self):
def _get_all_transactions(self) -> Iterator[BaseTransaction]:
self._flush_to_storage(self.dirty_txs.copy())
for tx in self.store.get_all_transactions():
# XXX: explicitly use _get_all_transaction instead of get_all_transactions because there will already be a
# TransactionCacheStorage.get_all_transactions outer method
for tx in self.store._get_all_transactions():
tx.storage = self
self._save_to_weakref(tx)
yield tx
Expand Down
4 changes: 4 additions & 0 deletions hathor/transaction/storage/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ class PartialMigrationError(HathorError):

class OutOfOrderMigrationError(HathorError):
"""A migration was run before another that was before it"""


class TransactionNotInAllowedScopeError(TransactionDoesNotExist):
"""You are trying to get a transaction that is not allowed in the current scope, treated as non-existent"""
2 changes: 1 addition & 1 deletion hathor/transaction/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _get_transaction(self, hash_bytes: bytes) -> BaseTransaction:
else:
raise TransactionDoesNotExist(hash_bytes.hex())

def get_all_transactions(self, *, include_partial: bool = False) -> Iterator[BaseTransaction]:
def _get_all_transactions(self) -> Iterator[BaseTransaction]:
for tx in self.transactions.values():
tx = self._clone(tx)
if tx.hash in self.metadata:
Expand Down
7 changes: 2 additions & 5 deletions hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def _get_transaction(self, hash_bytes: bytes) -> 'BaseTransaction':
if not tx:
raise TransactionDoesNotExist(hash_bytes.hex())

assert tx._metadata is not None
assert tx.hash == hash_bytes

self._save_to_weakref(tx)
Expand All @@ -146,7 +147,7 @@ def _get_tx(self, hash_bytes: bytes, tx_data: bytes) -> 'BaseTransaction':
self._save_to_weakref(tx)
return tx

def get_all_transactions(self, *, include_partial: bool = False) -> Iterator['BaseTransaction']:
def _get_all_transactions(self) -> Iterator['BaseTransaction']:
tx: Optional['BaseTransaction']

items = self._db.iteritems(self._cf_tx)
Expand All @@ -163,10 +164,6 @@ def get_all_transactions(self, *, include_partial: bool = False) -> Iterator['Ba
tx = self._get_tx(hash_bytes, tx_data)

assert tx is not None
if not include_partial:
assert tx._metadata is not None
if not tx._metadata.validation.is_fully_connected():
continue
yield tx

def is_empty(self) -> bool:
Expand Down
88 changes: 81 additions & 7 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import hashlib
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict, deque
from contextlib import AbstractContextManager
from threading import Lock
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Type, cast
from weakref import WeakValueDictionary
Expand All @@ -28,8 +29,13 @@
from hathor.pubsub import PubSubManager
from hathor.transaction.base_transaction import BaseTransaction
from hathor.transaction.block import Block
from hathor.transaction.storage.exceptions import TransactionDoesNotExist, TransactionIsNotABlock
from hathor.transaction.storage.exceptions import (
TransactionDoesNotExist,
TransactionIsNotABlock,
TransactionNotInAllowedScopeError,
)
from hathor.transaction.storage.migrations import BaseMigration, MigrationState, add_min_height_metadata
from hathor.transaction.storage.tx_allow_scope import TxAllowScope, tx_allow_context
from hathor.transaction.transaction import Transaction
from hathor.transaction.transaction_metadata import TransactionMetadata
from hathor.util import not_none
Expand Down Expand Up @@ -94,6 +100,9 @@ def __init__(self) -> None:
# This is a global lock used to prevent concurrent access when getting the tx lock in the dict above
self._weakref_lock: Lock = Lock()

# Flag to allow/disallow partially validated vertices.
self.allow_scope: TxAllowScope = TxAllowScope.VALID

# Cache for the best block tips
# This cache is updated in the consensus algorithm.
self._best_block_tips_cache: Optional[List[bytes]] = None
Expand Down Expand Up @@ -330,6 +339,43 @@ def get_transaction_from_weakref(self, hash_bytes: bytes) -> Optional[BaseTransa
return None
return self._tx_weakref.get(hash_bytes, None)

# TODO: check if the method bellow is currently needed
def allow_only_valid_context(self) -> AbstractContextManager[None]:
"""This method is used to temporarily reset the storage back to only allow valid transactions.

The implementation will OVERRIDE the current scope to allowing only valid transactions on the observed
storage.
"""
return tx_allow_context(self, allow_scope=TxAllowScope.VALID)

def allow_partially_validated_context(self) -> AbstractContextManager[None]:
"""This method is used to temporarily make the storage allow partially validated transactions.

The implementation will INCLUDE allowing partially valid transactions to the current allow scope.
"""
new_allow_scope = self.allow_scope | TxAllowScope.PARTIAL
return tx_allow_context(self, allow_scope=new_allow_scope)

def allow_invalid_context(self) -> AbstractContextManager[None]:
"""This method is used to temporarily make the storage allow invalid transactions.

The implementation will INCLUDE allowing invalid transactions to the current allow scope.
"""
new_allow_scope = self.allow_scope | TxAllowScope.INVALID
return tx_allow_context(self, allow_scope=new_allow_scope)

def is_only_valid_allowed(self) -> bool:
"""Whether only valid transactions are allowed to be returned/accepted by the storage, the default state."""
return self.allow_scope is TxAllowScope.VALID

def is_partially_validated_allowed(self) -> bool:
"""Whether partially validated transactions are allowed to be returned/accepted by the storage."""
return TxAllowScope.PARTIAL in self.allow_scope

def is_invalid_allowed(self) -> bool:
"""Whether invalid transactions are allowed to be returned/accepted by the storage."""
return TxAllowScope.INVALID in self.allow_scope

def _enable_weakref(self) -> None:
""" Weakref should never be disabled unless you know exactly what you are doing.
"""
Expand All @@ -354,7 +400,7 @@ def save_transaction(self: 'TransactionStorage', tx: BaseTransaction, *, only_me
self.pre_save_validation(tx, meta)

def pre_save_validation(self, tx: BaseTransaction, tx_meta: TransactionMetadata) -> None:
""" Must be run before every save, only raises AssertionError.
""" Must be run before every save, will raise AssertionError or TransactionNotInAllowedScopeError

A failure means there is a bug in the code that allowed the condition to reach the "save" code. This is a last
second measure to prevent persisting a bad transaction/metadata.
Expand All @@ -365,13 +411,32 @@ def pre_save_validation(self, tx: BaseTransaction, tx_meta: TransactionMetadata)
assert tx.hash is not None
assert tx_meta.hash is not None
assert tx.hash == tx_meta.hash, f'{tx.hash.hex()} != {tx_meta.hash.hex()}'
self._validate_partial_marker_consistency(tx_meta)
self._validate_transaction_in_scope(tx)

def post_get_validation(self, tx: BaseTransaction) -> None:
""" Must be run before every save, will raise AssertionError or TransactionNotInAllowedScopeError

A failure means there is a bug in the code that allowed the condition to reach the "get" code. This is a last
second measure to prevent getting a transaction while using the wrong scope.
"""
tx_meta = tx.get_metadata()
self._validate_partial_marker_consistency(tx_meta)
self._validate_transaction_in_scope(tx)

def _validate_partial_marker_consistency(self, tx_meta: TransactionMetadata) -> None:
voided_by = tx_meta.get_frozen_voided_by()
# XXX: PARTIALLY_VALIDATED_ID must be included if the tx is fully connected and must not be included otherwise
has_partially_validated_marker = settings.PARTIALLY_VALIDATED_ID in voided_by
validation_is_fully_connected = tx_meta.validation.is_fully_connected()
assert (not has_partially_validated_marker) == validation_is_fully_connected, \
'Inconsistent ValidationState and voided_by'

def _validate_transaction_in_scope(self, tx: BaseTransaction) -> None:
if not self.allow_scope.is_allowed(tx):
tx_meta = tx.get_metadata()
raise TransactionNotInAllowedScopeError(tx.hash_hex, self.allow_scope.name, tx_meta.validation.name)

@abstractmethod
def remove_transaction(self, tx: BaseTransaction) -> None:
"""Remove the tx.
Expand Down Expand Up @@ -483,6 +548,7 @@ def get_transaction(self, hash_bytes: bytes) -> BaseTransaction:
tx = self._get_transaction(hash_bytes)
else:
tx = self._get_transaction(hash_bytes)
self.post_get_validation(tx)
return tx

def get_metadata(self, hash_bytes: bytes) -> Optional[TransactionMetadata]:
Expand All @@ -497,10 +563,16 @@ def get_metadata(self, hash_bytes: bytes) -> Optional[TransactionMetadata]:
except TransactionDoesNotExist:
return None

def get_all_transactions(self) -> Iterator[BaseTransaction]:
"""Return all vertices (transactions and blocks) within the allowed scope.
"""
for tx in self._get_all_transactions():
if self.allow_scope.is_allowed(tx):
yield tx

@abstractmethod
def get_all_transactions(self, *, include_partial: bool = False) -> Iterator[BaseTransaction]:
# TODO: verify the following claim:
"""Return all transactions that are not blocks.
def _get_all_transactions(self) -> Iterator[BaseTransaction]:
"""Internal implementation that iterates over all transactions/blocks.
"""
raise NotImplementedError

Expand Down Expand Up @@ -950,14 +1022,14 @@ def iter_mempool_from_best_index(self) -> Iterator[Transaction]:
else:
yield from self.iter_mempool_from_tx_tips()

def get_transactions_that_became_invalid(self) -> List[BaseTransaction]:
def compute_transactions_that_became_invalid(self) -> List[BaseTransaction]:
""" This method will look for transactions in the mempool that have became invalid due to the reward lock.
"""
from hathor.transaction.transaction_metadata import ValidationState
to_remove: List[BaseTransaction] = []
for tx in self.iter_mempool_from_best_index():
if tx.is_spent_reward_locked():
tx.get_metadata().validation = ValidationState.INVALID
tx.set_validation(ValidationState.INVALID)
to_remove.append(tx)
return to_remove

Expand Down Expand Up @@ -1001,6 +1073,8 @@ def reset_indexes(self) -> None:
"""Reset all indexes. This function should not be called unless you know what you are doing."""
assert self.indexes is not None, 'Cannot reset indexes because they have not been enabled.'
self.indexes.force_clear_all()
self.update_best_block_tips_cache(None)
self._all_tips_cache = None

def remove_cache(self) -> None:
"""Remove all caches in case we don't need it."""
Expand Down
Loading