diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 806b4c9b6..0b5e2df61 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -299,7 +299,6 @@ def register_resources(self, args: Namespace) -> None: GraphvizNeighboursResource, PushTxResource, SubmitBlockResource, - TipsHistogramResource, TransactionAccWeightResource, TransactionResource, TxParentsResource, @@ -374,7 +373,6 @@ def register_resources(self, args: Namespace) -> None: PushTxResource(self.manager, args.max_output_script_size, args.allow_non_standard_script), root), (b'graphviz', graphviz, root), - (b'tips-histogram', TipsHistogramResource(self.manager), root), (b'transaction', TransactionResource(self.manager), root), (b'transaction_acc_weight', TransactionAccWeightResource(self.manager), root), (b'dashboard_tx', DashboardTransactionResource(self.manager), root), diff --git a/hathor/consensus.py b/hathor/consensus.py index 289f7e659..80e655037 100644 --- a/hathor/consensus.py +++ b/hathor/consensus.py @@ -184,6 +184,7 @@ def update_voided_info(self, block: Block) -> None: meta = block.get_metadata() if not meta.voided_by: storage._best_block_tips = [block.hash] + storage.add_new_to_block_height_index(meta.height, block.hash) # The following assert must be true, but it is commented out for performance reasons. # assert len(storage.get_best_block_tips(skip_cache=True)) == 1 else: @@ -242,8 +243,17 @@ def update_voided_info(self, block: Block) -> None: meta = block.get_metadata() if not meta.voided_by: storage._best_block_tips = [block.hash] + self.log.debug('index new winner block', height=meta.height, block=block.hash_hex) + # We update the height cache index with the new winner chain + storage.update_block_height_cache_new_chain(meta.height, block) else: storage._best_block_tips = [blk.hash for blk in heads] + # XXX Is it safe to select one of the heads? + best_block = heads[0] + assert best_block.hash is not None + best_meta = best_block.get_metadata() + self.log.debug('index previous best block', height=best_meta.height, block=best_block.hash_hex) + storage.add_new_to_block_height_index(best_meta.height, best_block.hash) # Uncomment the following lines to check that the cache update is working properly. # You shouldn't run this test in production because it dampens performance. @@ -286,7 +296,7 @@ def update_voided_by_from_parents(self, block: Block) -> bool: else: meta.voided_by = voided_by.copy() block.storage.save_transaction(block, only_metadata=True) - block.storage._del_from_cache(block, relax_assert=True) # XXX: accessing private method + block.storage.del_from_indexes(block, relax_assert=True) return True return False @@ -735,7 +745,7 @@ def update_voided_info(self, tx: Transaction) -> None: if voided_by: meta.voided_by = voided_by.copy() tx.storage.save_transaction(tx, only_metadata=True) - tx.storage._del_from_cache(tx) # XXX: accessing private method + tx.storage.del_from_indexes(tx) # Check conflicts of the transactions voiding us. for h in voided_by: @@ -853,7 +863,7 @@ def remove_voided_by(self, tx: Transaction, voided_hash: bytes) -> bool: tx2.storage.save_transaction(tx2, only_metadata=True) if not meta.voided_by: meta.voided_by = None - tx.storage._add_to_cache(tx2) # XXX: accessing private method + tx.storage.add_to_indexes(tx2) from hathor.transaction import Transaction for tx2 in check_list: @@ -912,7 +922,7 @@ def add_voided_by(self, tx: Transaction, voided_hash: bytes) -> bool: # All voided transactions with conflicts must have their accumulated weight calculated. tx2.update_accumulated_weight(save_file=False) tx2.storage.save_transaction(tx2, only_metadata=True) - tx2.storage._del_from_cache(tx2, relax_assert=True) # XXX: accessing private method + tx2.storage.del_from_indexes(tx2, relax_assert=True) for tx2 in check_list: self.check_conflicts(tx2) diff --git a/hathor/daa.py b/hathor/daa.py index 6e5672527..7fd79afc3 100644 --- a/hathor/daa.py +++ b/hathor/daa.py @@ -23,6 +23,8 @@ from math import log from typing import TYPE_CHECKING, List +from structlog import get_logger + from hathor.conf import HathorSettings from hathor.profiler import get_cpu_profiler from hathor.util import iwindows @@ -30,6 +32,7 @@ if TYPE_CHECKING: from hathor.transaction import Block, Transaction +logger = get_logger() settings = HathorSettings() cpu = get_cpu_profiler() @@ -49,6 +52,7 @@ class TestMode(IntFlag): def _set_test_mode(mode: TestMode) -> None: global TEST_MODE + logger.debug('change DAA test mode', from_mode=TEST_MODE.name, to_mode=mode.name) TEST_MODE = mode diff --git a/hathor/manager.py b/hathor/manager.py index 0973df286..3a2fd0a19 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -17,7 +17,7 @@ import sys import time from enum import Enum -from typing import Any, Iterator, List, NamedTuple, Optional, Tuple, Union +from typing import Any, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union from structlog import get_logger from twisted.internet import defer @@ -30,7 +30,7 @@ from hathor.checkpoint import Checkpoint from hathor.conf import HathorSettings from hathor.consensus import ConsensusAlgorithm -from hathor.exception import InvalidNewTransaction +from hathor.exception import HathorError, InvalidNewTransaction from hathor.indexes import TokensIndex, WalletIndex from hathor.mining import BlockTemplate, BlockTemplates from hathor.p2p.peer_discovery import PeerDiscovery @@ -41,6 +41,7 @@ from hathor.transaction import BaseTransaction, Block, MergeMinedBlock, Transaction, TxVersion, sum_weights from hathor.transaction.exceptions import TxValidationError from hathor.transaction.storage import TransactionStorage +from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.wallet import BaseWallet settings = HathorSettings() @@ -213,7 +214,7 @@ def start(self) -> None: 'that was stopped in the middle. The storage is not reliable anymore and, because of that, ' 'you must initialize with a full verification again or remove your storage and do a full sync.' ) - sys.exit() + sys.exit(-1) # If self.tx_storage.is_running_manager() is True, the last time the node was running it had a sudden crash # because of that, we must run a full verification because some storage data might be wrong. @@ -224,7 +225,7 @@ def start(self) -> None: 'The storage is not reliable anymore and, because of that, so you must run a full verification ' 'or remove your storage and do a full sync.' ) - sys.exit() + sys.exit(-1) self.state = self.NodeState.INITIALIZING self.pubsub.publish(HathorEvents.MANAGER_ON_START) @@ -336,10 +337,22 @@ def _initialize_components(self) -> None: self.tx_storage.pre_init() + # Checkpoints as {height: hash} + checkpoint_heights = {} + for cp in self.checkpoints: + checkpoint_heights[cp.height] = cp.hash + # self.start_profiler() + if self._full_verification: + self.log.debug('reset all metadata') + for tx in self.tx_storage.get_all_transactions(): + tx.reset_metadata() self.log.debug('load blocks and transactions') for tx in self.tx_storage._topological_sort(): + if self._full_verification: + tx.update_initial_metadata() + assert tx.hash is not None tx_meta = tx.get_metadata() @@ -367,12 +380,34 @@ def _initialize_components(self) -> None: skip_block_weight_verification = False try: - assert self.on_new_tx( - tx, - quiet=True, - fails_silently=False, - skip_block_weight_verification=skip_block_weight_verification - ) + if self._full_verification: + # TODO: deal with invalid tx + if self.tx_storage.is_tx_needed(tx.hash): + assert isinstance(tx, Transaction) + tx._height_cache = self.tx_storage.needed_index_height(tx.hash) + if tx.can_validate_full(): + self.tx_storage.add_to_indexes(tx) + assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification) + self.consensus_algorithm.update(tx) + self.tx_storage.update_tx_tips(tx) + self.step_validations([tx]) + if tx.is_block: + self.tx_storage.add_to_parent_blocks_index(tx.hash) + else: + assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification) + self.tx_storage.add_to_deps_index(tx.hash, tx.get_all_dependencies()) + self.tx_storage.add_needed_deps(tx) + self.tx_storage.save_transaction(tx, only_metadata=True) + else: + # TODO: deal with invalid tx + if not tx_meta.validation.is_final(): + assert tx_meta.validation.is_at_least_basic(), f'invalid: {tx.hash_hex}' + self.tx_storage.add_needed_deps(tx) + elif tx.is_transaction and tx_meta.first_block is None and not tx_meta.voided_by: + self.tx_storage.update_tx_tips(tx) + elif tx.is_block: + self.tx_storage.add_to_parent_blocks_index(tx.hash) + self.tx_storage.add_to_indexes(tx) except (InvalidNewTransaction, TxValidationError): self.log.error('unexpected error when initializing', tx=tx, exc_info=True) raise @@ -380,12 +415,63 @@ def _initialize_components(self) -> None: if tx.is_block: block_count += 1 + # this works because blocks on the best chain are iterated from lower to higher height + assert tx.hash is not None + assert tx_meta.validation.is_at_least_basic() + if not tx_meta.voided_by: + # XXX: this might not be needed when making a full init because the consensus should already have + self.tx_storage.add_new_to_block_height_index(tx_meta.height, tx.hash) + + # Check if it's a checkpoint block + if tx_meta.height in checkpoint_heights: + if tx.hash == checkpoint_heights[tx_meta.height]: + del checkpoint_heights[tx_meta.height] + else: + # If the hash is different from checkpoint hash, we stop the node + self.log.error('Error initializing the node. Checkpoint validation error.') + sys.exit() + else: + tx_count += 1 + if time.time() - t2 > 1: dt = hathor.util.LogDuration(time.time() - t2) self.log.warn('tx took too long to load', tx=tx.hash_hex, dt=dt) + # we have to have a best_block by now + # assert best_block is not None + self.log.debug('done loading transactions') + # Check if all checkpoints in database are ok + my_best_height = self.tx_storage.get_height_best_block() + if checkpoint_heights: + # If I have checkpoints that were not validated I must check if they are all in a height I still don't have + first = min(list(checkpoint_heights.keys())) + if first <= my_best_height: + # If the height of the first checkpoint not validated is lower than the height of the best block + # Then it's missing this block + self.log.error('Error initializing the node. Checkpoint validation error.') + sys.exit() + + # restart all validations possible + deps_size = self.tx_storage.count_deps_index() + if deps_size > 0: + self.log.debug('run pending validations', deps_size=deps_size) + depended_final_txs: List[BaseTransaction] = [] + for tx_hash in self.tx_storage.iter_deps_index(): + if not self.tx_storage.transaction_exists(tx_hash): + continue + tx = self.tx_storage.get_transaction(tx_hash) + if tx.get_metadata().validation.is_final(): + depended_final_txs.append(tx) + self.step_validations(depended_final_txs) + new_deps_size = self.tx_storage.count_deps_index() + self.log.debug('pending validations finished', changes=deps_size - new_deps_size) + + best_height = self.tx_storage.get_height_best_block() + if best_height != h: + self.log.warn('best height doesn\'t match', best_height=best_height, max_height=h) + # self.stop_profiler(save_to='profiles/initializing.prof') self.state = self.NodeState.READY tdt = hathor.util.LogDuration(t2 - t0) @@ -587,33 +673,6 @@ def get_tokens_issued_per_block(self, height: int) -> int: """Return the number of tokens issued (aka reward) per block of a given height.""" return daa.get_tokens_issued_per_block(height) - def validate_new_tx(self, tx: BaseTransaction, skip_block_weight_verification: bool = False) -> bool: - """ Process incoming transaction during initialization. - These transactions came only from storage. - """ - assert tx.hash is not None - - if self.state == self.NodeState.INITIALIZING: - if tx.is_genesis: - return True - - else: - if tx.is_genesis: - raise InvalidNewTransaction('Genesis? {}'.format(tx.hash_hex)) - - now = self.reactor.seconds() - if tx.timestamp - now > settings.MAX_FUTURE_TIMESTAMP_ALLOWED: - raise InvalidNewTransaction('Ignoring transaction in the future {} (timestamp={}, now={})'.format( - tx.hash_hex, tx.timestamp, now)) - - if self.state != self.NodeState.INITIALIZING and not tx.can_validate_full(): - raise InvalidNewTransaction('Cannot validate, missing dependency') - - # validate transaction, raises a TxValidationError if tx is not valid - tx.validate_full() - - return True - def submit_block(self, blk: Block, fails_silently: bool = True) -> bool: """Used by submit block from all mining APIs. """ @@ -633,59 +692,106 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool assert tx.storage == self.tx_storage, 'Invalid tx storage' else: tx.storage = self.tx_storage - return self.on_new_tx(tx, fails_silently=fails_silently) + + return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True) @cpu.profiler('on_new_tx') def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = None, quiet: bool = False, fails_silently: bool = True, propagate_to_peers: bool = True, - skip_block_weight_verification: bool = False) -> bool: - """This method is called when any transaction arrive. - - If `fails_silently` is False, it may raise either InvalidNewTransaction or TxValidationError. - - :return: True if the transaction was accepted - :rtype: bool + skip_block_weight_verification: bool = False, partial: bool = False) -> bool: + """ New method for adding transactions or blocks that steps the validation state machine. """ assert tx.hash is not None - if self.state != self.NodeState.INITIALIZING: - if self.tx_storage.transaction_exists(tx.hash): - if not fails_silently: - raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex)) - self.log.debug('on_new_tx(): Transaction already exists', tx=tx.hash_hex) - return False + if self.tx_storage.transaction_exists(tx.hash): + if not fails_silently: + raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex)) + self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex) + return False + + if tx.timestamp - self.reactor.seconds() > settings.MAX_FUTURE_TIMESTAMP_ALLOWED: + if not fails_silently: + raise InvalidNewTransaction('Ignoring transaction in the future {} (timestamp={})'.format( + tx.hash_hex, tx.timestamp)) + self.log.warn('on_new_tx(): Ignoring transaction in the future', tx=tx.hash_hex, + future_timestamp=tx.timestamp) + return False + + tx.storage = self.tx_storage + + try: + metadata = tx.get_metadata() + except TransactionDoesNotExist: + if not fails_silently: + raise InvalidNewTransaction('missing parent') + self.log.warn('on_new_tx(): missing parent', tx=tx.hash_hex) + return False - if self.state != self.NodeState.INITIALIZING or self._full_verification: + if metadata.validation.is_invalid(): + if not fails_silently: + raise InvalidNewTransaction('previously marked as invalid') + self.log.warn('on_new_tx(): previously marked as invalid', tx=tx.hash_hex) + return False + + # if partial=False (the default) we don't even try to partially validate transactions + if not partial or (metadata.validation.is_fully_connected() or tx.can_validate_full()): + if isinstance(tx, Transaction) and self.tx_storage.is_tx_needed(tx.hash): + tx._height_cache = self.tx_storage.needed_index_height(tx.hash) + + if not metadata.validation.is_valid(): + try: + tx.validate_full() + except HathorError as e: + if not fails_silently: + raise InvalidNewTransaction('full validation failed') from e + self.log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True) + return False + + # The method below adds the tx as a child of the parents + # This needs to be called right before the save because we were adding the children + # in the tx parents even if the tx was invalid (failing the verifications above) + # then I would have a children that was not in the storage + tx.update_initial_metadata() + self.tx_storage.save_transaction(tx, add_to_indexes=True) try: - assert self.validate_new_tx(tx, skip_block_weight_verification=skip_block_weight_verification) is True - except (InvalidNewTransaction, TxValidationError): - # Discard invalid Transaction/block. - self.log.debug('tx/block discarded', tx=tx, exc_info=True) + self.consensus_algorithm.update(tx) + except HathorError as e: + if not fails_silently: + raise InvalidNewTransaction('consensus update failed') from e + self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex) + return False + else: + self.tx_fully_validated(tx) + else: + if isinstance(tx, Block) and not tx.has_basic_block_parent(): + if not fails_silently: + raise InvalidNewTransaction('block parent needs to be at least basic-valid') + self.log.warn('on_new_tx(): block parent needs to be at least basic-valid', tx=tx.hash_hex) + return False + if not tx.validate_basic(): if not fails_silently: - raise + raise InvalidNewTransaction('basic validation failed') + self.log.warn('on_new_tx(): basic validation failed', tx=tx.hash_hex) return False - if self.state != self.NodeState.INITIALIZING: + # The method below adds the tx as a child of the parents + # This needs to be called right before the save because we were adding the children + # in the tx parents even if the tx was invalid (failing the verifications above) + # then I would have a children that was not in the storage + tx.update_initial_metadata() self.tx_storage.save_transaction(tx) - else: - self.tx_storage._add_to_cache(tx) - if self._full_verification: - tx.reset_metadata() - else: - # When doing a fast init, we don't update the consensus, so we must trust the data on the metadata - # For transactions, we don't store them on the tips index if they are voided - # We have to execute _add_to_cache before because _del_from_cache does not remove from all indexes - metadata = tx.get_metadata() - if not tx.is_block and metadata.voided_by: - self.tx_storage._del_from_cache(tx) - - if self.state != self.NodeState.INITIALIZING or self._full_verification: - try: - tx.update_initial_metadata() - self.consensus_algorithm.update(tx) - except Exception: - self.log.exception('unexpected error when processing tx', tx=tx) - self.tx_storage.remove_transaction(tx) - raise + self.tx_storage.add_to_deps_index(tx.hash, tx.get_all_dependencies()) + self.tx_storage.add_needed_deps(tx) + + if tx.is_transaction: + self.tx_storage.remove_from_needed_index(tx.hash) + + try: + self.step_validations([tx]) + except (AssertionError, HathorError) as e: + if not fails_silently: + raise InvalidNewTransaction('step validations failed') from e + self.log.warn('on_new_tx(): step validations failed', tx=tx.hash_hex) + return False if not quiet: ts_date = datetime.datetime.fromtimestamp(tx.timestamp) @@ -699,14 +805,51 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non # Propagate to our peers. self.connections.send_tx_to_peers(tx) - if self.wallet: - # TODO Remove it and use pubsub instead. - self.wallet.on_new_tx(tx) + return True + + def step_validations(self, txs: Iterable[BaseTransaction]) -> None: + """ Step all validations until none can be stepped anymore. + """ + # cur_txs will be empty when there are no more new txs that reached full + # validation because of an initial trigger + for ready_tx in txs: + assert ready_tx.hash is not None + self.tx_storage.remove_ready_for_validation(ready_tx.hash) + for tx in map(self.tx_storage.get_transaction, self.tx_storage.next_ready_for_validation()): + assert tx.hash is not None + tx.update_initial_metadata() + try: + assert tx.validate_full() + except (AssertionError, HathorError): + # TODO + raise + else: + self.tx_storage.save_transaction(tx, only_metadata=True, add_to_indexes=True) + self.consensus_algorithm.update(tx) + # save and process its dependencies even if it became invalid + # because invalidation state also has to propagate to children + self.tx_storage.remove_ready_for_validation(tx.hash) + self.tx_fully_validated(tx) - # Publish to pubsub manager the new tx accepted + def tx_fully_validated(self, tx: BaseTransaction) -> None: + """ Handle operations that need to happen once the tx becomes fully validated. + + This might happen immediately after we receive the tx, if we have all dependencies + already. Or it might happen later. + """ + assert tx.hash is not None + + # Publish to pubsub manager the new tx accepted, now that it's full validated self.pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=tx) - return True + self.tx_storage.del_from_deps_index(tx.hash) + self.tx_storage.update_tx_tips(tx) + if tx.is_block: + self.tx_storage.add_to_parent_blocks_index(tx.hash) + + if self.wallet: + # TODO Remove it and use pubsub instead. + self.wallet.on_new_tx(tx) def listen(self, description: str, use_ssl: Optional[bool] = None) -> None: endpoint = self.connections.listen(description, use_ssl) diff --git a/hathor/p2p/downloader.py b/hathor/p2p/downloader.py index 3cf2d6d58..93fd7e2d5 100644 --- a/hathor/p2p/downloader.py +++ b/hathor/p2p/downloader.py @@ -203,12 +203,12 @@ def on_new_tx(self, tx: 'BaseTransaction') -> None: """ This is called when a new transaction arrives. """ assert tx.hash is not None - self.log.debug('new tx', tx=tx.hash_hex) + self.log.debug('new tx/block', tx=tx.hash_hex) details = self.pending_transactions.get(tx.hash, None) if not details: # Something is wrong! It should never happen. - self.log.warn('new tx arrived but tx detail does not exist', tx=tx.hash_hex) + self.log.warn('new tx/block arrived but tx detail does not exist', tx=tx.hash_hex) return assert len(self.downloading_deque) > 0 diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index 37233405e..c9141f2ee 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -35,9 +35,9 @@ from hathor.transaction import BaseTransaction if TYPE_CHECKING: - from hathor.manager import HathorManager # noqa: F401 - from hathor.p2p.factory import HathorClientFactory, HathorServerFactory # noqa: F401 - from hathor.p2p.node_sync import NodeSyncTimestamp # noqa: F401 + from hathor.manager import HathorManager + from hathor.p2p.factory import HathorClientFactory, HathorServerFactory + from hathor.p2p.node_sync import NodeSyncTimestamp logger = get_logger() settings = HathorSettings() diff --git a/hathor/p2p/node_sync.py b/hathor/p2p/node_sync.py index 9ad841931..930dabea9 100644 --- a/hathor/p2p/node_sync.py +++ b/hathor/p2p/node_sync.py @@ -686,6 +686,10 @@ def handle_data(self, payload: str) -> None: assert tx.timestamp is not None self.requested_data_arrived(tx.timestamp) deferred.callback(tx) + elif self.manager.tx_storage.transaction_exists(tx.hash): + # transaction already added to the storage, ignore it + # XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs + return else: # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. @@ -719,7 +723,7 @@ def get_data_key(self, hash_bytes: bytes) -> str: key = 'get-data-{}'.format(hash_bytes.hex()) return key - def remove_deferred(self, reason: 'Failure', hash_bytes: bytes) -> None: + def remove_deferred(self, reason: 'Failure', hash_bytes: bytes) -> None: """ Remove the deferred from the deferred_by_key Used when a requested tx deferred fails for some reason (not found, or timeout) """ diff --git a/hathor/transaction/base_transaction.py b/hathor/transaction/base_transaction.py index 03618168b..7be96ae61 100644 --- a/hathor/transaction/base_transaction.py +++ b/hathor/transaction/base_transaction.py @@ -331,15 +331,21 @@ def get_time_from_now(self, now: Optional[Any] = None) -> str: minutes, seconds = divmod(seconds, 60) return '{} days, {:02d}:{:02d}:{:02d}'.format(dt.days, hours, minutes, seconds) - def get_parents(self) -> Iterator['BaseTransaction']: + def get_parents(self, *, existing_only: bool = False) -> Iterator['BaseTransaction']: """Return an iterator of the parents :return: An iterator of the parents :rtype: Iter[BaseTransaction] """ + from hathor.transaction.storage.exceptions import TransactionDoesNotExist + for parent_hash in self.parents: assert self.storage is not None - yield self.storage.get_transaction(parent_hash) + try: + yield self.storage.get_transaction(parent_hash) + except TransactionDoesNotExist: + if not existing_only: + raise @property def is_genesis(self) -> bool: @@ -442,6 +448,9 @@ def can_validate_full(self) -> bool: """ Check if this transaction is ready to be fully validated, either all deps are full-valid or one is invalid. """ assert self.storage is not None + assert self.hash is not None + if self.is_genesis: + return True deps = self.get_all_dependencies() all_exist = True all_valid = True @@ -855,10 +864,11 @@ def update_initial_metadata(self) -> None: assert self.hash is not None assert self.storage is not None - for parent in self.get_parents(): + for parent in self.get_parents(existing_only=True): metadata = parent.get_metadata() - metadata.children.append(self.hash) - self.storage.save_transaction(parent, only_metadata=True) + if self.hash not in metadata.children: + metadata.children.append(self.hash) + self.storage.save_transaction(parent, only_metadata=True) def update_timestamp(self, now: int) -> None: """Update this tx's timestamp diff --git a/hathor/transaction/resources/__init__.py b/hathor/transaction/resources/__init__.py index 3c5897ad2..ad66008da 100644 --- a/hathor/transaction/resources/__init__.py +++ b/hathor/transaction/resources/__init__.py @@ -18,7 +18,6 @@ from hathor.transaction.resources.graphviz import GraphvizFullResource, GraphvizNeighboursResource from hathor.transaction.resources.mining import GetBlockTemplateResource, SubmitBlockResource from hathor.transaction.resources.push_tx import PushTxResource -from hathor.transaction.resources.tips_histogram import TipsHistogramResource from hathor.transaction.resources.transaction import TransactionResource from hathor.transaction.resources.transaction_confirmation import TransactionAccWeightResource from hathor.transaction.resources.tx_parents import TxParentsResource @@ -35,7 +34,6 @@ 'TransactionAccWeightResource', 'TransactionResource', 'DashboardTransactionResource', - 'TipsHistogramResource', 'TxParentsResource', 'ValidateAddressResource', ] diff --git a/hathor/transaction/resources/tips_histogram.py b/hathor/transaction/resources/tips_histogram.py deleted file mode 100644 index 0558c8c71..000000000 --- a/hathor/transaction/resources/tips_histogram.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright 2021 Hathor Labs -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json - -from twisted.web import resource - -from hathor.api_util import parse_get_arguments, set_cors -from hathor.cli.openapi_files.register import register_resource - -ARGS = ['begin', 'end'] - - -@register_resource -class TipsHistogramResource(resource.Resource): - """ Implements a web server API to return the tips in a timestamp interval. - Returns a list of timestamps and numbers of tips. - - You must run with option `--status `. - """ - isLeaf = True - - def __init__(self, manager): - self.manager = manager - - def render_GET(self, request): - """ Get request to /tips-histogram/ that return the number of tips between two timestamp - We expect two GET parameters: 'begin' and 'end' - - 'begin': int that indicates the beginning of the interval - 'end': int that indicates the end of the interval - - :rtype: string (json) - """ - request.setHeader(b'content-type', b'application/json; charset=utf-8') - set_cors(request, 'GET') - - parsed = parse_get_arguments(request.args, ARGS) - if not parsed['success']: - return json.dumps({ - 'success': False, - 'message': 'Missing parameter: {}'.format(parsed['missing']) - }).encode('utf-8') - - args = parsed['args'] - - # Get quantity for each - try: - begin = int(args['begin']) - except ValueError: - return json.dumps({ - 'success': False, - 'message': 'Invalid parameter, cannot convert to int: begin' - }).encode('utf-8') - - try: - end = int(args['end']) - except ValueError: - return json.dumps({ - 'success': False, - 'message': 'Invalid parameter, cannot convert to int: end' - }).encode('utf-8') - - v = [] - for timestamp in range(begin, end + 1): - tx_tips = self.manager.tx_storage.get_tx_tips(timestamp) - v.append((timestamp, len(tx_tips))) - - return json.dumps({'success': True, 'tips': v}).encode('utf-8') - - -TipsHistogramResource.openapi = { - '/tips-histogram': { - 'x-visibility': 'private', - 'get': { - 'tags': ['transaction'], - 'operationId': 'tips_histogram', - 'summary': 'Histogram of tips', - 'description': ('Returns a list of tuples (timestamp, quantity)' - 'for each timestamp in the requested interval'), - 'parameters': [ - { - 'name': 'begin', - 'in': 'query', - 'description': 'Beggining of the timestamp interval', - 'required': True, - 'schema': { - 'type': 'int' - } - }, - { - 'name': 'end', - 'in': 'query', - 'description': 'End of the timestamp interval', - 'required': True, - 'schema': { - 'type': 'int' - } - } - ], - 'responses': { - '200': { - 'description': 'Success', - 'content': { - 'application/json': { - 'examples': { - 'success': { - 'summary': 'Success', - 'value': [ - [ - 1547163020, - 1 - ], - [ - 1547163021, - 4 - ], - [ - 1547163022, - 2 - ] - ] - }, - 'error': { - 'summary': 'Invalid parameter', - 'value': { - 'success': False, - 'message': 'Missing parameter: begin' - } - } - } - } - } - } - } - } - } -} diff --git a/hathor/transaction/storage/binary_storage.py b/hathor/transaction/storage/binary_storage.py index c8a801e80..fdcec9a31 100644 --- a/hathor/transaction/storage/binary_storage.py +++ b/hathor/transaction/storage/binary_storage.py @@ -59,8 +59,8 @@ def remove_transaction(self, tx): except FileNotFoundError: pass - def save_transaction(self, tx, *, only_metadata=False): - super().save_transaction(tx, only_metadata=only_metadata) + def save_transaction(self, tx, *, only_metadata=False, add_to_indexes=False): + super().save_transaction(tx, only_metadata=only_metadata, add_to_indexes=add_to_indexes) self._save_transaction(tx, only_metadata=only_metadata) self._save_to_weakref(tx) diff --git a/hathor/transaction/storage/block_height_index.py b/hathor/transaction/storage/block_height_index.py new file mode 100644 index 000000000..19b1aa479 --- /dev/null +++ b/hathor/transaction/storage/block_height_index.py @@ -0,0 +1,62 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional, Tuple + +from hathor.transaction.genesis import BLOCK_GENESIS +from hathor.util import not_none + +BLOCK_GENESIS_HASH: bytes = not_none(BLOCK_GENESIS.hash) + + +class BlockHeightIndex: + """Store the block hash for each given height + """ + def __init__(self) -> None: + self._index: List[bytes] = [BLOCK_GENESIS_HASH] + + def add(self, height: int, block_hash: bytes, *, can_reorg: bool = False) -> None: + """Add new element to the cache. Must not be called for re-orgs. + """ + if len(self._index) < height: + raise ValueError(f'parent hash required (current height: {len(self._index)}, new height: {height})') + elif len(self._index) == height: + self._index.append(block_hash) + elif self._index[height] != block_hash: + if can_reorg: + del self._index[height:] + self._index.append(block_hash) + else: + raise ValueError('adding would cause a re-org, use can_reorg=True to accept re-orgs') + else: + # nothing to do (there are more blocks, but the block at height currently matches the added block) + pass + + def get(self, height: int) -> Optional[bytes]: + """ Return the block hash for the given height, or None if it is not set. + """ + if len(self._index) <= height: + return None + return self._index[height] + + def get_tip(self) -> bytes: + """ Return the best block hash, it returns the genesis when there is no other block + """ + return self._index[-1] + + def get_height_tip(self) -> Tuple[int, bytes]: + """ Return the best block height and hash, it returns the genesis when there is no other block + """ + height = len(self._index) - 1 + return height, self._index[height] diff --git a/hathor/transaction/storage/cache_storage.py b/hathor/transaction/storage/cache_storage.py index cb808da01..3c3afeb1f 100644 --- a/hathor/transaction/storage/cache_storage.py +++ b/hathor/transaction/storage/cache_storage.py @@ -119,12 +119,13 @@ def remove_transaction(self, tx: BaseTransaction) -> None: self.dirty_txs.discard(tx.hash) self._remove_from_weakref(tx) - def save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) -> None: + def save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False, + add_to_indexes: bool = False) -> None: self._save_transaction(tx) self._save_to_weakref(tx) # call super which adds to index if needed - super().save_transaction(tx, only_metadata=only_metadata) + super().save_transaction(tx, only_metadata=only_metadata, add_to_indexes=add_to_indexes) def get_all_genesis(self) -> Set[BaseTransaction]: return self.store.get_all_genesis() diff --git a/hathor/transaction/storage/compact_storage.py b/hathor/transaction/storage/compact_storage.py index e97dfc495..1048531b6 100644 --- a/hathor/transaction/storage/compact_storage.py +++ b/hathor/transaction/storage/compact_storage.py @@ -73,8 +73,9 @@ def remove_transaction(self, tx: 'BaseTransaction') -> None: except FileNotFoundError: pass - def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None: - super().save_transaction(tx, only_metadata=only_metadata) + def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False, + add_to_indexes: bool = False) -> None: + super().save_transaction(tx, only_metadata=only_metadata, add_to_indexes=add_to_indexes) self._save_transaction(tx, only_metadata=only_metadata) self._save_to_weakref(tx) diff --git a/hathor/transaction/storage/memory_storage.py b/hathor/transaction/storage/memory_storage.py index 5a4f350be..33f41af19 100644 --- a/hathor/transaction/storage/memory_storage.py +++ b/hathor/transaction/storage/memory_storage.py @@ -48,8 +48,9 @@ def remove_transaction(self, tx: BaseTransaction) -> None: self.transactions.pop(tx.hash, None) self.metadata.pop(tx.hash, None) - def save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) -> None: - super().save_transaction(tx, only_metadata=only_metadata) + def save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False, + add_to_indexes: bool = False) -> None: + super().save_transaction(tx, only_metadata=only_metadata, add_to_indexes=add_to_indexes) self._save_transaction(tx, only_metadata=only_metadata) def _save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) -> None: diff --git a/hathor/transaction/storage/old_rocksdb_storage.py b/hathor/transaction/storage/old_rocksdb_storage.py index 7035672f4..f3ef0fa34 100644 --- a/hathor/transaction/storage/old_rocksdb_storage.py +++ b/hathor/transaction/storage/old_rocksdb_storage.py @@ -54,8 +54,9 @@ def remove_transaction(self, tx: 'BaseTransaction') -> None: self._db.delete(tx.hash) self._remove_from_weakref(tx) - def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None: - super().save_transaction(tx, only_metadata=only_metadata) + def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False, + add_to_indexes: bool = False) -> None: + super().save_transaction(tx, only_metadata=only_metadata, add_to_indexes=add_to_indexes) self._save_transaction(tx, only_metadata=only_metadata) self._save_to_weakref(tx) diff --git a/hathor/transaction/storage/rocksdb_storage.py b/hathor/transaction/storage/rocksdb_storage.py index 0706af951..b35698b32 100644 --- a/hathor/transaction/storage/rocksdb_storage.py +++ b/hathor/transaction/storage/rocksdb_storage.py @@ -128,8 +128,9 @@ def remove_transaction(self, tx: 'BaseTransaction') -> None: self._db.delete((self._cf_meta, tx.hash)) self._remove_from_weakref(tx) - def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False) -> None: - super().save_transaction(tx, only_metadata=only_metadata) + def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False, + add_to_indexes: bool = False) -> None: + super().save_transaction(tx, only_metadata=only_metadata, add_to_indexes=add_to_indexes) self._save_transaction(tx, only_metadata=only_metadata) self._save_to_weakref(tx) diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index e49302331..9070e6522 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -16,7 +16,7 @@ from abc import ABC, abstractmethod, abstractproperty from collections import deque from threading import Lock -from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, cast +from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, NamedTuple, Optional, Set, Tuple, cast from weakref import WeakValueDictionary from intervaltree.interval import Interval @@ -25,14 +25,21 @@ from hathor.conf import HathorSettings from hathor.indexes import IndexesManager, TokensIndex, TransactionsIndex, WalletIndex from hathor.pubsub import HathorEvents, PubSubManager +from hathor.transaction.base_transaction import BaseTransaction from hathor.transaction.block import Block +from hathor.transaction.storage.block_height_index import BlockHeightIndex from hathor.transaction.storage.exceptions import TransactionDoesNotExist, TransactionIsNotABlock -from hathor.transaction.transaction import BaseTransaction -from hathor.transaction.transaction_metadata import TransactionMetadata +from hathor.transaction.storage.traversal import BFSWalk +from hathor.transaction.transaction import Transaction +from hathor.transaction.transaction_metadata import TransactionMetadata, ValidationState +from hathor.util import not_none settings = HathorSettings() +INF_HEIGHT: int = 1_000_000_000_000 + + class AllTipsCache(NamedTuple): timestamp: int tips: Set[Interval] @@ -40,6 +47,14 @@ class AllTipsCache(NamedTuple): hashes: List[bytes] +class _DirDepValue(Dict[bytes, ValidationState]): + """This class is used to add a handy method to values on dependency indexes.""" + + def is_ready(self) -> bool: + """True if all deps' validation are fully connected.""" + return all(val.is_fully_connected() for val in self.values()) + + class TransactionStorage(ABC): """Legacy sync interface, please copy @deprecated decorator when implementing methods.""" @@ -53,6 +68,8 @@ class TransactionStorage(ABC): log = get_logger() def __init__(self): + from hathor.transaction.genesis import BLOCK_GENESIS + # Weakref is used to guarantee that there is only one instance of each transaction in memory. self._tx_weakref: WeakValueDictionary[bytes, BaseTransaction] = WeakValueDictionary() self._tx_weakref_disabled: bool = False @@ -94,6 +111,310 @@ def __init__(self): # Key storage attribute to save if the node has clean db self._clean_db_attribute: str = 'clean_db' + # Cache of block hash by height + self._block_height_index = BlockHeightIndex() + + # Direct and reverse dependency mapping (i.e. needs and needed by) + self._dir_dep_index: Dict[bytes, _DirDepValue] = {} + self._rev_dep_index: Dict[bytes, Set[bytes]] = {} + self._txs_with_deps_ready: Set[bytes] = set() + + # Needed txs (key: tx missing, value: requested by) + self._needed_txs_index: Dict[bytes, Tuple[int, bytes]] = {} + + # Hold txs that have not been confirmed + self._tx_tips_index: Set[bytes] = set() + + # Hold blocks that can be used as the next parent block + # XXX: if there is more than one they must all have the same score, must always have at least one hash + self._parent_blocks_index: Set[bytes] = {BLOCK_GENESIS.hash} + + # rev-dep-index methods: + + def count_deps_index(self) -> int: + """Count total number of txs with dependencies.""" + return len(self._dir_dep_index) + + def _get_validation_state(self, tx: bytes) -> ValidationState: + """Query database for the validation state of a transaction, returns INITIAL when tx does not exist.""" + tx_meta = self.get_metadata(tx) + if tx_meta is None: + return ValidationState.INITIAL + return tx_meta.validation + + def _update_deps(self, deps: _DirDepValue) -> None: + """Propagate the new validation state of the given deps.""" + for tx, validation in deps.items(): + self._update_validation(tx, validation) + + def _update_validation(self, tx: bytes, validation: ValidationState) -> None: + """Propagate the new validation state of a given dep.""" + for cousin in self._rev_dep_index[tx].copy(): + deps = self._dir_dep_index[cousin] + # XXX: this check serves to avoid calling is_ready() when nothing changed + if deps[tx] != validation: + deps[tx] = validation + if deps.is_ready(): + self.del_from_deps_index(cousin) + self._txs_with_deps_ready.add(cousin) + + def add_to_deps_index(self, tx: bytes, deps: Iterable[bytes]) -> None: + """Call to add all dependencies a transaction has.""" + # deps are immutable for a given hash + _deps = _DirDepValue((dep, self._get_validation_state(dep)) for dep in deps) + # short circuit add directly to ready + if _deps.is_ready(): + self._txs_with_deps_ready.add(tx) + return + # add direct deps + if __debug__ and tx in self._dir_dep_index: + # XXX: dependencies set must be immutable + assert self._dir_dep_index[tx].keys() == _deps.keys() + self._dir_dep_index[tx] = _deps + # add reverse dep + for rev_dep in _deps: + if rev_dep not in self._rev_dep_index: + self._rev_dep_index[rev_dep] = set() + self._rev_dep_index[rev_dep].add(tx) + + def del_from_deps_index(self, tx: bytes) -> None: + """Call to remove tx from all reverse dependencies, for example when validation is complete.""" + _deps = self._dir_dep_index.pop(tx, _DirDepValue()) + for rev_dep in _deps.keys(): + rev_deps = self._rev_dep_index[rev_dep] + if tx in rev_deps: + rev_deps.remove(tx) + if not rev_deps: + del self._rev_dep_index[rev_dep] + + def is_ready_for_validation(self, tx: bytes) -> bool: + """ Whether a tx can be fully validated (implies fully connected). + """ + return tx in self._txs_with_deps_ready + + def remove_ready_for_validation(self, tx: bytes) -> None: + """ Removes from ready for validation set. + """ + self._txs_with_deps_ready.discard(tx) + + def next_ready_for_validation(self, *, dry_run: bool = False) -> Iterator[bytes]: + """ Yields and removes all txs ready for validation even if they become ready while iterating. + """ + if dry_run: + cur_ready = self._txs_with_deps_ready.copy() + else: + cur_ready, self._txs_with_deps_ready = self._txs_with_deps_ready, set() + while cur_ready: + yield from iter(cur_ready) + if dry_run: + cur_ready = self._txs_with_deps_ready - cur_ready + else: + cur_ready, self._txs_with_deps_ready = self._txs_with_deps_ready, set() + + def iter_deps_index(self) -> Iterator[bytes]: + """Iterate through all hashes depended by any tx or block.""" + yield from self._rev_dep_index.keys() + + def get_rev_deps(self, tx: bytes) -> FrozenSet[bytes]: + """Get all txs that depend on the given tx (i.e. its reverse depdendencies).""" + return frozenset(self._rev_dep_index.get(tx, set())) + + # needed-txs-index methods: + + def has_needed_tx(self) -> bool: + """Whether there is any tx on the needed tx index.""" + return bool(self._needed_txs_index) + + def is_tx_needed(self, tx: bytes) -> bool: + """Whether a tx is in the requested tx list.""" + return tx in self._needed_txs_index + + def needed_index_height(self, tx: bytes) -> int: + """Indexed height from the needed tx index.""" + return self._needed_txs_index[tx][0] + + def remove_from_needed_index(self, tx: bytes) -> None: + """Remove tx from needed txs index, tx doesn't need to be in the index.""" + self._needed_txs_index.pop(tx, None) + + def get_next_needed_tx(self) -> bytes: + """Choose the start hash for downloading the needed txs""" + # This strategy maximizes the chance to download multiple txs on the same stream + # find the tx with highest "height" + # XXX: we could cache this onto `needed_txs` so we don't have to fetch txs every time + height, start_hash, tx = max((h, s, t) for t, (h, s) in self._needed_txs_index.items()) + self.log.debug('next needed tx start', needed=len(self._needed_txs_index), start=start_hash.hex(), + height=height, needed_tx=tx.hex()) + return start_hash + + def add_needed_deps(self, tx: BaseTransaction) -> None: + if isinstance(tx, Block): + height = tx.get_metadata().height + else: + assert isinstance(tx, Transaction) + first_block = tx.get_metadata().first_block + if first_block is None: + # XXX: consensus did not run yet to update first_block, what should we do? + # I'm defaulting the height to `inf` (practically), this should make it heightest priority when + # choosing which transactions to fetch next + height = INF_HEIGHT + else: + block = self.get_transaction(first_block) + assert isinstance(block, Block) + height = block.get_metadata().height + # get_tx_parents is used instead of get_tx_dependencies because the remote will traverse the parent + # tree, not # the dependency tree, eventually we should receive all tx dependencies and be able to validate + # this transaction + for tx_hash in tx.get_tx_parents(): + # It may happen that we have one of the dependencies already, so just add the ones we don't + # have. We should add at least one dependency, otherwise this tx should be full validated + if not self.transaction_exists(tx_hash): + self._needed_txs_index[tx_hash] = (height, not_none(tx.hash)) + + # parent-blocks-index methods: + + def add_to_parent_blocks_index(self, block: bytes) -> None: + from math import isclose + meta = not_none(self.get_metadata(block)) + new_score = not_none(meta).score + cur_score = not_none(self.get_metadata(next(iter(self._parent_blocks_index)))).score + if isclose(new_score, cur_score): + self.log.debug('same score: new competing parent block', block=block.hex()) + self._parent_blocks_index.add(block) + elif new_score > cur_score and not meta.voided_by: + # If it's a high score, then I can't add one that is voided + self.log.debug('high score: new best parent block', block=block.hex()) + self._parent_blocks_index.clear() + self._parent_blocks_index.add(block) + else: + self.log.debug('low score: skip parent block', block=block.hex()) + + # tx-tips-index methods: + + def iter_tx_tips(self, max_timestamp: Optional[float] = None) -> Iterator[Transaction]: + """ + Iterate over txs that are tips, a subset of the mempool (i.e. not tx-parent of another tx on the mempool). + """ + it = map(self.get_transaction, self._tx_tips_index) + if max_timestamp is not None: + it = filter(lambda tx: tx.timestamp < not_none(max_timestamp), it) + yield from cast(Iterator[Transaction], it) + + def remove_from_tx_tips_index(self, remove_txs: Set[bytes]) -> None: + """ + This should be called to remove a transaction from the "mempool", usually when it is confirmed by a block. + """ + self._tx_tips_index -= remove_txs + + def iter_mempool(self) -> Iterator[Transaction]: + """ + Iterate over the transactions on the "mempool", even the ones that are not tips. + """ + bfs = BFSWalk(self, is_dag_verifications=True, is_left_to_right=False) + for tx in bfs.run(map(self.get_transaction, self._tx_tips_index), skip_root=False): + assert isinstance(tx, Transaction) + yield tx + if tx.get_metadata().first_block is not None: + bfs.skip_neighbors(tx) + + def update_tx_tips(self, tx: BaseTransaction) -> None: + """ + This should be called when a new `tx` is created and added to the "mempool". + """ + # A new tx/block added might cause a tx in the tips to become voided. For instance, + # there might be a tx1 a double spending tx2, where tx1 is valid and tx2 voided. A new block + # confirming tx2 will make it valid while tx1 becomes voided, so it has to be removed + # from the tips. + assert tx.hash is not None + to_remove: Set[bytes] = set() + to_remove_parents: Set[bytes] = set() + for tip_tx in self.iter_tx_tips(): + assert tip_tx.hash is not None + # A new tx/block added might cause a tx in the tips to become voided. For instance, + # there might be twin txs, tx1 and tx2, where tx1 is valid and tx2 voided. A new block + # confirming tx2 will make it valid while tx1 becomes voided, so it has to be removed + # from the tips. The txs confirmed by tx1 need to be double checked, as they might + # themselves become tips (hence we use to_remove_parents) + meta = tip_tx.get_metadata() + if meta.voided_by: + to_remove.add(tip_tx.hash) + to_remove_parents.update(tip_tx.parents) + continue + + # might also happen that a tip has a child that became valid, so it's not a tip anymore + confirmed = False + for child_meta in map(self.get_metadata, meta.children): + assert child_meta is not None + if not child_meta.voided_by: + confirmed = True + break + if confirmed: + to_remove.add(tip_tx.hash) + + if to_remove: + self.remove_from_tx_tips_index(to_remove) + self.log.debug('removed voided txs from tips', txs=[tx.hex() for tx in to_remove]) + + # Check if any of the txs being confirmed by the voided txs is a tip again. This happens + # if it doesn't have any other valid child. + to_add = set() + for tx_hash in to_remove_parents: + confirmed = False + # check if it has any valid children + meta = not_none(self.get_metadata(tx_hash)) + if meta.voided_by: + continue + children = meta.children + for child_meta in map(self.get_metadata, children): + assert child_meta is not None + if not child_meta.voided_by: + confirmed = True + break + if not confirmed: + to_add.add(tx_hash) + + if to_add: + self._tx_tips_index.update(to_add) + self.log.debug('added txs to tips', txs=[tx.hex() for tx in to_add]) + + if tx.get_metadata().voided_by: + # this tx is voided, don't need to update the tips + self.log.debug('voided tx, won\'t add it as a tip', tx=tx.hash_hex) + return + + self.remove_from_tx_tips_index(set(tx.parents)) + + if tx.is_transaction and tx.get_metadata().first_block is None: + self._tx_tips_index.add(tx.hash) + + # block height index methods: + + def update_block_height_cache_new_chain(self, height: int, block: Block) -> None: + """ When we have a new winner chain we must update all the height index + until the first height with a common block + """ + assert self.get_from_block_height_index(height) != block.hash + + block_height = height + side_chain_block = block + add_to_cache: List[Tuple[int, bytes]] = [] + while self.get_from_block_height_index(block_height) != side_chain_block.hash: + add_to_cache.append((block_height, not_none(side_chain_block.hash))) + + side_chain_block = side_chain_block.get_block_parent() + new_block_height = side_chain_block.get_metadata().height + assert new_block_height + 1 == block_height + block_height = new_block_height + + # Reverse the data because I was adding in the array from the highest block + reversed_add_to_cache = add_to_cache[::-1] + + for height, tx_hash in reversed_add_to_cache: + # Add it to the index + self.add_reorg_to_block_height_index(height, tx_hash) + + # all other methods: + def is_empty(self) -> bool: """True when only genesis is present, useful for checking for a fresh database.""" return self.get_count_tx_blocks() <= 3 @@ -102,6 +423,13 @@ def pre_init(self) -> None: """Storages can implement this to run code before transaction loading starts""" pass + def get_best_block(self) -> Block: + """The block with highest score or one of the blocks with highest scores. Can be used for mining.""" + block_hash = self._block_height_index.get_tip() + block = self.get_transaction(block_hash) + assert isinstance(block, Block) + return block + def _save_or_verify_genesis(self) -> None: """Save all genesis in the storage.""" for tx in self._get_genesis_from_settings(): @@ -110,7 +438,7 @@ def _save_or_verify_genesis(self) -> None: tx2 = self.get_transaction(tx.hash) assert tx == tx2 except TransactionDoesNotExist: - self.save_transaction(tx) + self.save_transaction(tx, add_to_indexes=True) tx2 = tx assert tx2.hash is not None self._genesis_cache[tx2.hash] = tx2 @@ -158,23 +486,32 @@ def _disable_weakref(self) -> None: self._tx_weakref_disabled = True @abstractmethod - def save_transaction(self: 'TransactionStorage', tx: BaseTransaction, *, only_metadata: bool = False) -> None: + def save_transaction(self: 'TransactionStorage', tx: BaseTransaction, *, only_metadata: bool = False, + add_to_indexes: bool = False) -> None: # XXX: although this method is abstract (because a subclass must implement it) the implementer # should call the base implementation for correctly interacting with the index """Saves the tx. :param tx: Transaction to save :param only_metadata: Don't save the transaction, only the metadata of this transaction + :param add_to_indexes: Add this transaction to the indexes """ meta = tx.get_metadata() + if tx.hash in self._rev_dep_index: + self._update_validation(tx.hash, meta.validation) + + # XXX: we can only add to cache and publish txs that are complete, having the parents and all inputs + if not meta.validation.is_valid(): + return + if self.pubsub: if not meta.voided_by: self.pubsub.publish(HathorEvents.STORAGE_TX_WINNER, tx=tx) else: self.pubsub.publish(HathorEvents.STORAGE_TX_VOIDED, tx=tx) - if self.with_index and not only_metadata: - self._add_to_cache(tx) + if self.with_index and add_to_indexes: + self.add_to_indexes(tx) @abstractmethod def remove_transaction(self, tx: BaseTransaction) -> None: @@ -185,8 +522,8 @@ def remove_transaction(self, tx: BaseTransaction) -> None: if self.with_index: assert self.all_index is not None - self._del_from_cache(tx, relax_assert=True) - # TODO Move it to self._del_from_cache. We cannot simply do it because + self.del_from_indexes(tx, relax_assert=True) + # TODO Move it to self.del_from_indexes. We cannot simply do it because # this method is used by the consensus algorithm which does not # expect to have it removed from self.all_index. self.all_index.del_tx(tx, relax_assert=True) @@ -294,10 +631,11 @@ def get_best_block_tips(self, timestamp: Optional[float] = None, *, skip_cache: """ if timestamp is None and not skip_cache and self._best_block_tips is not None: return self._best_block_tips + best_score = 0.0 - best_tip_blocks = [] # List[bytes(hash)] - tip_blocks = [x.data for x in self.get_block_tips(timestamp)] - for block_hash in tip_blocks: + best_tip_blocks: List[bytes] = [] + + for block_hash in (x.data for x in self.get_block_tips(timestamp)): meta = self.get_metadata(block_hash) assert meta is not None if meta.voided_by and meta.voided_by != set([block_hash]): @@ -456,11 +794,11 @@ def _topological_sort(self) -> Iterator[BaseTransaction]: raise NotImplementedError @abstractmethod - def _add_to_cache(self, tx: BaseTransaction) -> None: + def add_to_indexes(self, tx: BaseTransaction) -> None: raise NotImplementedError @abstractmethod - def _del_from_cache(self, tx: BaseTransaction, *, relax_assert: bool = False) -> None: + def del_from_indexes(self, tx: BaseTransaction, *, relax_assert: bool = False) -> None: raise NotImplementedError @abstractmethod @@ -563,6 +901,18 @@ def is_db_clean(self) -> bool: """ return self.get_value(self._clean_db_attribute) == '1' + def add_new_to_block_height_index(self, height: int, block_hash: bytes) -> None: + """Add a new block to the height index that must not result in a re-org""" + self._block_height_index.add(height, block_hash) + + def add_reorg_to_block_height_index(self, height: int, block_hash: bytes) -> None: + """Add a new block to the height index that can result in a re-org""" + # XXX: in the future we can make this more strict so that it MUST result in a re-orgr + self._block_height_index.add(height, block_hash, can_reorg=True) + + def get_from_block_height_index(self, height: int) -> bytes: + return self._block_height_index.get(height) + class BaseTransactionStorage(TransactionStorage): def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> None: @@ -640,11 +990,12 @@ def get_tx_tips(self, timestamp: Optional[float] = None) -> Set[Interval]: timestamp = self.latest_timestamp tips = self.tx_index.tips_index[timestamp] - # This `for` is for assert only. How to skip it when running with `-O` parameter? - for interval in tips: - meta = self.get_metadata(interval.data) - assert meta is not None - assert not meta.voided_by + if __debug__: + # XXX: this `for` is for assert only and thus is inside `if __debug__:` + for interval in tips: + meta = self.get_metadata(interval.data) + assert meta is not None + # assert not meta.voided_by return tips @@ -721,7 +1072,7 @@ def _manually_initialize(self) -> None: # We need to construct a topological sort, then iterate from # genesis to tips. for tx in self._topological_sort(): - self._add_to_cache(tx) + self.add_to_indexes(tx) def _topological_sort(self) -> Iterator[BaseTransaction]: # TODO We must optimize this algorithm to remove the `visited` set. @@ -761,15 +1112,25 @@ def _topological_sort_dfs(self, root: BaseTransaction, visited: Dict[bytes, int] # matter. for parent_hash in tx.parents[::-1]: if parent_hash not in visited: - parent = self.get_transaction(parent_hash) - stack.append(parent) + try: + parent = self.get_transaction(parent_hash) + except TransactionDoesNotExist: + # XXX: it's possible transactions won't exist because of missing dependencies + pass + else: + stack.append(parent) for txin in tx.inputs: if txin.tx_id not in visited: - txinput = self.get_transaction(txin.tx_id) - stack.append(txinput) - - def _add_to_cache(self, tx: BaseTransaction) -> None: + try: + txinput = self.get_transaction(txin.tx_id) + except TransactionDoesNotExist: + # XXX: it's possible transactions won't exist because of missing dependencies + pass + else: + stack.append(txinput) + + def add_to_indexes(self, tx: BaseTransaction) -> None: if not self.with_index: raise NotImplementedError assert self.all_index is not None @@ -794,7 +1155,7 @@ def _add_to_cache(self, tx: BaseTransaction) -> None: if self.tx_index.add_tx(tx): self._cache_tx_count += 1 - def _del_from_cache(self, tx: BaseTransaction, *, relax_assert: bool = False) -> None: + def del_from_indexes(self, tx: BaseTransaction, *, relax_assert: bool = False) -> None: if not self.with_index: raise NotImplementedError assert self.block_index is not None diff --git a/hathor/wallet/base_wallet.py b/hathor/wallet/base_wallet.py index 30aaebb0c..0377effdd 100644 --- a/hathor/wallet/base_wallet.py +++ b/hathor/wallet/base_wallet.py @@ -521,6 +521,7 @@ def on_new_tx(self, tx: BaseTransaction) -> None: script_type_out = parse_address_script(output.script) if script_type_out: if script_type_out.address in self.keys: + self.log.debug('detected tx output', tx=tx.hash_hex, index=index, address=script_type_out.address) token_id = tx.get_token_uid(output.get_token_index()) # this wallet received tokens utxo = UnspentTx(tx.hash, index, output.value, tx.timestamp, script_type_out.address, diff --git a/tests/resources/transaction/test_tips_histogram.py b/tests/resources/transaction/test_tips_histogram.py deleted file mode 100644 index d0cf5736b..000000000 --- a/tests/resources/transaction/test_tips_histogram.py +++ /dev/null @@ -1,72 +0,0 @@ -import time - -from twisted.internet.defer import inlineCallbacks - -from hathor.transaction.resources import TipsHistogramResource -from tests.resources.base_resource import StubSite, _BaseResourceTest -from tests.utils import add_blocks_unlock_reward, add_new_blocks, add_new_transactions - - -class TipsTest(_BaseResourceTest._ResourceTest): - def setUp(self): - super().setUp() - self.web = StubSite(TipsHistogramResource(self.manager)) - self.manager.wallet.unlock(b'MYPASS') - self.manager.reactor.advance(time.time()) - - @inlineCallbacks - def test_get_tips_histogram(self): - # Add blocks to have funds - add_new_blocks(self.manager, 2, 2) - add_blocks_unlock_reward(self.manager) - - txs = add_new_transactions(self.manager, 10, 2) - - response1 = yield self.web.get("tips-histogram", { - b'begin': str(txs[0].timestamp).encode(), - b'end': str(txs[0].timestamp).encode() - }) - data1 = response1.json_value() - self.assertTrue(data1['success']) - self.assertEqual(len(data1['tips']), 1) - self.assertEqual([txs[0].timestamp, 1], data1['tips'][0]) - - response2 = yield self.web.get("tips-histogram", { - b'begin': str(txs[0].timestamp).encode(), - b'end': str(txs[0].timestamp + 1).encode() - }) - data2 = response2.json_value() - self.assertTrue(data2['success']) - self.assertEqual(len(data2['tips']), 2) - self.assertEqual([txs[0].timestamp, 1], data2['tips'][0]) - self.assertEqual([txs[0].timestamp + 1, 1], data2['tips'][1]) - - response3 = yield self.web.get("tips-histogram", { - b'begin': str(txs[0].timestamp).encode(), - b'end': str(txs[-1].timestamp).encode() - }) - data3 = response3.json_value() - self.assertTrue(data3['success']) - self.assertEqual(len(data3['tips']), 19) - - @inlineCallbacks - def test_invalid_params(self): - # missing end param - response = yield self.web.get("tips-histogram", {b'begin': b'0'}) - data = response.json_value() - self.assertFalse(data['success']) - - # wrong end param - response = yield self.web.get("tips-histogram", {b'begin': b'a', b'end': b'10'}) - data = response.json_value() - self.assertFalse(data['success']) - - # missing begin param - response = yield self.web.get("tips-histogram", {b'end': b'0'}) - data = response.json_value() - self.assertFalse(data['success']) - - # wrong begin param - response = yield self.web.get("tips-histogram", {b'begin': b'0', b'end': b'a'}) - data = response.json_value() - self.assertFalse(data['success']) diff --git a/tests/tx/test_indexes.py b/tests/tx/test_indexes.py new file mode 100644 index 000000000..9f61930c4 --- /dev/null +++ b/tests/tx/test_indexes.py @@ -0,0 +1,134 @@ +from hathor.crypto.util import decode_address +from hathor.transaction import Transaction +from hathor.transaction.storage import TransactionMemoryStorage +from hathor.wallet import Wallet +from tests import unittest +from tests.utils import add_blocks_unlock_reward, add_new_blocks, get_genesis_key + + +class BasicTransaction(unittest.TestCase): + def setUp(self): + super().setUp() + self.wallet = Wallet() + self.tx_storage = TransactionMemoryStorage() + self.genesis = self.tx_storage.get_all_genesis() + self.genesis_blocks = [tx for tx in self.genesis if tx.is_block] + self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] + + # read genesis keys + self.genesis_private_key = get_genesis_key() + self.genesis_public_key = self.genesis_private_key.public_key() + + # this makes sure we can spend the genesis outputs + self.manager = self.create_peer('testnet', tx_storage=self.tx_storage, unlock_wallet=True, wallet_index=True) + blocks = add_blocks_unlock_reward(self.manager) + self.last_block = blocks[-1] + + def test_tx_tips_with_conflict(self): + from hathor.wallet.base_wallet import WalletOutputInfo + + add_new_blocks(self.manager, 5, advance_clock=15) + add_blocks_unlock_reward(self.manager) + + address = self.get_address(0) + value = 500 + + outputs = [WalletOutputInfo(address=decode_address(address), value=value, timelock=None)] + + tx1 = self.manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, self.manager.tx_storage) + tx1.weight = 2.0 + tx1.parents = self.manager.get_new_tx_parents() + tx1.timestamp = int(self.clock.seconds()) + tx1.resolve() + self.assertTrue(self.manager.propagate_tx(tx1, False)) + self.assertEqual( + {tx.hash for tx in self.manager.tx_storage.iter_tx_tips()}, + {tx1.hash} + ) + + outputs = [WalletOutputInfo(address=decode_address(address), value=value, timelock=None)] + + tx2 = self.manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, self.manager.tx_storage) + tx2.weight = 2.0 + tx2.parents = [tx1.hash] + self.manager.get_new_tx_parents()[1:] + self.assertIn(tx1.hash, tx2.parents) + tx2.timestamp = int(self.clock.seconds()) + 1 + tx2.resolve() + self.assertTrue(self.manager.propagate_tx(tx2, False)) + self.assertEqual( + {tx.hash for tx in self.manager.tx_storage.iter_tx_tips()}, + {tx2.hash} + ) + + tx3 = Transaction.create_from_struct(tx2.get_struct()) + tx3.timestamp = tx2.timestamp + 1 + self.assertIn(tx1.hash, tx3.parents) + tx3.resolve() + self.assertNotEqual(tx2.hash, tx3.hash) + self.assertTrue(self.manager.propagate_tx(tx3, False)) + self.assertIn(tx3.hash, tx2.get_metadata().conflict_with) + self.assertEqual( + {tx.hash for tx in self.manager.tx_storage.iter_tx_tips()}, + # XXX: what should we expect here? I don't think we should exclude both tx2 and tx3, but maybe let the + # function using the index decide + # {tx1.hash, tx3.hash} + {tx1.hash} + ) + + def test_tx_tips_voided(self): + from hathor.wallet.base_wallet import WalletOutputInfo + + add_new_blocks(self.manager, 5, advance_clock=15) + add_blocks_unlock_reward(self.manager) + + address1 = self.get_address(0) + address2 = self.get_address(1) + address3 = self.get_address(2) + output1 = WalletOutputInfo(address=decode_address(address1), value=123, timelock=None) + output2 = WalletOutputInfo(address=decode_address(address2), value=234, timelock=None) + output3 = WalletOutputInfo(address=decode_address(address3), value=345, timelock=None) + outputs = [output1, output2, output3] + + tx1 = self.manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, self.manager.tx_storage) + tx1.weight = 2.0 + tx1.parents = self.manager.get_new_tx_parents() + tx1.timestamp = int(self.clock.seconds()) + tx1.resolve() + self.assertTrue(self.manager.propagate_tx(tx1, False)) + self.assertEqual( + {tx.hash for tx in self.manager.tx_storage.iter_tx_tips()}, + {tx1.hash} + ) + + tx2 = self.manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, self.manager.tx_storage) + tx2.weight = 2.0 + tx2.parents = [tx1.hash] + self.manager.get_new_tx_parents()[1:] + self.assertIn(tx1.hash, tx2.parents) + tx2.timestamp = int(self.clock.seconds()) + 1 + tx2.resolve() + self.assertTrue(self.manager.propagate_tx(tx2, False)) + self.assertEqual( + {tx.hash for tx in self.manager.tx_storage.iter_tx_tips()}, + {tx2.hash} + ) + + tx3 = Transaction.create_from_struct(tx2.get_struct()) + tx3.weight = 3.0 + # tx3.timestamp = tx2.timestamp + 1 + tx3.parents = tx1.parents + # self.assertIn(tx1.hash, tx3.parents) + tx3.resolve() + self.assertNotEqual(tx2.hash, tx3.hash) + self.assertTrue(self.manager.propagate_tx(tx3, False)) + # self.assertIn(tx3.hash, tx2.get_metadata().voided_by) + self.assertIn(tx3.hash, tx2.get_metadata().conflict_with) + self.assertEqual( + {tx.hash for tx in self.manager.tx_storage.iter_tx_tips()}, + # XXX: what should we expect here? I don't think we should exclude both tx2 and tx3, but maybe let the + # function using the index decide + {tx1.hash, tx3.hash} + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/tx/test_tx_storage.py b/tests/tx/test_tx_storage.py index ed403a653..e0e87f3b2 100644 --- a/tests/tx/test_tx_storage.py +++ b/tests/tx/test_tx_storage.py @@ -24,6 +24,7 @@ TransactionRocksDBStorage, ) from hathor.transaction.storage.exceptions import TransactionDoesNotExist +from hathor.transaction.transaction_metadata import ValidationState from hathor.wallet import Wallet from tests.utils import ( BURN_ADDRESS, @@ -47,6 +48,8 @@ class _BaseTransactionStorageTest: class _TransactionStorageTest(unittest.TestCase): def setUp(self, tx_storage, reactor=None): + from hathor.manager import HathorManager + if not reactor: self.reactor = Clock() else: @@ -61,7 +64,6 @@ def setUp(self, tx_storage, reactor=None): self.genesis_blocks = [tx for tx in self.genesis if tx.is_block] self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] - from hathor.manager import HathorManager self.tmpdir = tempfile.mkdtemp() wallet = Wallet(directory=self.tmpdir) wallet.unlock(b'teste') @@ -76,6 +78,7 @@ def setUp(self, tx_storage, reactor=None): nonce=100781, storage=tx_storage) self.block.resolve() self.block.verify() + self.block.get_metadata().validation = ValidationState.FULL tx_parents = [tx.hash for tx in self.genesis_txs] tx_input = TxInput( @@ -91,6 +94,7 @@ def setUp(self, tx_storage, reactor=None): tokens=[bytes.fromhex('0023be91834c973d6a6ddd1a0ae411807b7c8ef2a015afb5177ee64b666ce602')], parents=tx_parents, storage=tx_storage) self.tx.resolve() + self.tx.get_metadata().validation = ValidationState.FULL # Disable weakref to test the internal methods. Otherwise, most methods return objects from weakref. self.tx_storage._disable_weakref() @@ -151,7 +155,7 @@ def test_storage_basic(self): self.assertEqual(set(tx_parents_hash), {self.genesis_txs[0].hash, self.genesis_txs[1].hash}) def validate_save(self, obj): - self.tx_storage.save_transaction(obj) + self.tx_storage.save_transaction(obj, add_to_indexes=True) loaded_obj1 = self.tx_storage.get_transaction(obj.hash) @@ -170,7 +174,7 @@ def validate_save(self, obj): else: self.assertTrue(obj.hash in self.tx_storage.tx_index.tips_index.tx_last_interval) - self.tx_storage._del_from_cache(obj) + self.tx_storage.del_from_indexes(obj) if self.tx_storage.with_index: if obj.is_block: @@ -178,7 +182,7 @@ def validate_save(self, obj): else: self.assertFalse(obj.hash in self.tx_storage.tx_index.tips_index.tx_last_interval) - self.tx_storage._add_to_cache(obj) + self.tx_storage.add_to_indexes(obj) if self.tx_storage.with_index: if obj.is_block: self.assertTrue(obj.hash in self.tx_storage.block_index.tips_index.tx_last_interval) @@ -193,6 +197,7 @@ def test_save_tx(self): def test_save_token_creation_tx(self): tx = create_tokens(self.manager, propagate=False) + tx.get_metadata().validation = ValidationState.FULL self.validate_save(tx) def _validate_not_in_index(self, tx, index):