diff --git a/hathor/manager.py b/hathor/manager.py index 32ed72691..ec4946246 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -437,7 +437,7 @@ def _initialize_components_full_verification(self) -> None: if self.tx_storage.indexes.mempool_tips is not None: self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update if self.tx_storage.indexes.deps is not None: - self.sync_v2_step_validations([tx]) + self.sync_v2_step_validations([tx], quiet=True) self.tx_storage.save_transaction(tx, only_metadata=True) else: assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification) @@ -653,7 +653,7 @@ def _sync_v2_resume_validations(self) -> None: tx = self.tx_storage.get_transaction(tx_hash) if tx.get_metadata().validation.is_final(): depended_final_txs.append(tx) - self.sync_v2_step_validations(depended_final_txs) + self.sync_v2_step_validations(depended_final_txs, quiet=False) self.log.debug('pending validations finished') def add_listen_address(self, addr: str) -> None: @@ -910,8 +910,7 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool @cpu.profiler('on_new_tx') def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = None, quiet: bool = False, fails_silently: bool = True, propagate_to_peers: bool = True, - skip_block_weight_verification: bool = False, sync_checkpoints: bool = False, - partial: bool = False, reject_locked_reward: bool = True) -> bool: + skip_block_weight_verification: bool = False, reject_locked_reward: bool = True) -> bool: """ New method for adding transactions or blocks that steps the validation state machine. :param tx: transaction to be added @@ -920,11 +919,8 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non :param fails_silently: if False will raise an exception when tx cannot be added :param propagate_to_peers: if True will relay the tx to other peers if it is accepted :param skip_block_weight_verification: if True will not check the tx PoW - :param sync_checkpoints: if True and also partial=True, will try to validate as a checkpoint and set the proper - validation state, this is used for adding txs from the sync-checkpoints phase - :param partial: if True will accept txs that can't be fully validated yet (because of missing parent/input) but - will run a basic validation of what can be validated (PoW and other basic fields) """ + assert self.tx_storage.is_only_valid_allowed() assert tx.hash is not None if self.tx_storage.transaction_exists(tx.hash): self.tx_storage.compare_bytes_with_local_tx(tx) @@ -958,86 +954,35 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non self.log.warn('on_new_tx(): previously marked as invalid', tx=tx.hash_hex) return False - # if partial=False (the default) we don't even try to partially validate transactions - if not partial or (metadata.validation.is_fully_connected() or tx.can_validate_full()): - if not metadata.validation.is_fully_connected(): - try: - tx.validate_full(sync_checkpoints=sync_checkpoints, reject_locked_reward=reject_locked_reward) - except HathorError as e: - if not fails_silently: - raise InvalidNewTransaction('full validation failed') from e - self.log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True) - return False - - # The method below adds the tx as a child of the parents - # This needs to be called right before the save because we were adding the children - # in the tx parents even if the tx was invalid (failing the verifications above) - # then I would have a children that was not in the storage - tx.update_initial_metadata(save=False) - self.tx_storage.save_transaction(tx) - self.tx_storage.add_to_indexes(tx) + if not metadata.validation.is_fully_connected(): try: - self.consensus_algorithm.update(tx) + tx.validate_full(reject_locked_reward=reject_locked_reward) except HathorError as e: if not fails_silently: - raise InvalidNewTransaction('consensus update failed') from e - self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex) - return False - else: - assert tx.validate_full(skip_block_weight_verification=True, reject_locked_reward=reject_locked_reward) - self.tx_storage.indexes.update(tx) - if self.tx_storage.indexes.mempool_tips: - self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update - self.tx_fully_validated(tx) - elif sync_checkpoints: - assert self.tx_storage.indexes.deps is not None - metadata.children = self.tx_storage.indexes.deps.known_children(tx) - try: - tx.validate_checkpoint(self.checkpoints) - except HathorError: - if not fails_silently: - raise InvalidNewTransaction('checkpoint validation failed') - self.log.warn('on_new_tx(): checkpoint validation failed', tx=tx.hash_hex, exc_info=True) + raise InvalidNewTransaction('full validation failed') from e + self.log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True) return False - self.tx_storage.save_transaction(tx) - else: - if isinstance(tx, Block) and not tx.has_basic_block_parent(): - if not fails_silently: - raise InvalidNewTransaction('block parent needs to be at least basic-valid') - self.log.warn('on_new_tx(): block parent needs to be at least basic-valid', tx=tx.hash_hex) - return False - if not tx.validate_basic(): - if not fails_silently: - raise InvalidNewTransaction('basic validation failed') - self.log.warn('on_new_tx(): basic validation failed', tx=tx.hash_hex) - return False - - # The method below adds the tx as a child of the parents - # This needs to be called right before the save because we were adding the children - # in the tx parents even if the tx was invalid (failing the verifications above) - # then I would have a children that was not in the storage - tx.update_initial_metadata(save=False) - self.tx_storage.save_transaction(tx) - if tx.is_transaction and self.tx_storage.indexes.deps is not None: - self.tx_storage.indexes.deps.remove_from_needed_index(tx.hash) - - if self.tx_storage.indexes.deps is not None: - try: - self.sync_v2_step_validations([tx]) - except (AssertionError, HathorError) as e: - if not fails_silently: - raise InvalidNewTransaction('step validations failed') from e - self.log.warn('on_new_tx(): step validations failed', tx=tx.hash_hex, exc_info=True) - return False + # The method below adds the tx as a child of the parents + # This needs to be called right before the save because we were adding the children + # in the tx parents even if the tx was invalid (failing the verifications above) + # then I would have a children that was not in the storage + tx.update_initial_metadata(save=False) + self.tx_storage.save_transaction(tx) + self.tx_storage.add_to_indexes(tx) + try: + self.consensus_algorithm.update(tx) + except HathorError as e: + if not fails_silently: + raise InvalidNewTransaction('consensus update failed') from e + self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex) + return False - if not quiet: - ts_date = datetime.datetime.fromtimestamp(tx.timestamp) - now = datetime.datetime.fromtimestamp(self.reactor.seconds()) - if tx.is_block: - self.log.info('new block', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now)) - else: - self.log.info('new tx', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now)) + assert tx.validate_full(skip_block_weight_verification=True, reject_locked_reward=reject_locked_reward) + self.tx_storage.indexes.update(tx) + if self.tx_storage.indexes.mempool_tips: + self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update + self.tx_fully_validated(tx, quiet=quiet) if propagate_to_peers: # Propagate to our peers. @@ -1045,7 +990,30 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non return True - def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None: + def log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None: + """ A shortcut for logging additional information for block/txs. + """ + metadata = tx.get_metadata() + now = datetime.datetime.fromtimestamp(self.reactor.seconds()) + kwargs = { + 'tx': tx, + 'ts_date': datetime.datetime.fromtimestamp(tx.timestamp), + 'time_from_now': tx.get_time_from_now(now), + 'validation': metadata.validation.name, + } + if tx.is_block: + message = message_fmt.format('block') + if isinstance(tx, Block): + kwargs['height'] = tx.get_height() + else: + message = message_fmt.format('tx') + if not quiet: + log_func = self.log.info + else: + log_func = self.log.debug + log_func(message, **kwargs) + + def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: bool) -> None: """ Step all validations until none can be stepped anymore. """ assert self.tx_storage.indexes is not None @@ -1072,9 +1040,9 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None: self.tx_storage.indexes.update(tx) if self.tx_storage.indexes.mempool_tips: self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update - self.tx_fully_validated(tx) + self.tx_fully_validated(tx, quiet=quiet) - def tx_fully_validated(self, tx: BaseTransaction) -> None: + def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None: """ Handle operations that need to happen once the tx becomes fully validated. This might happen immediately after we receive the tx, if we have all dependencies @@ -1093,6 +1061,8 @@ def tx_fully_validated(self, tx: BaseTransaction) -> None: # TODO Remove it and use pubsub instead. self.wallet.on_new_tx(tx) + self.log_new_object(tx, 'new {}', quiet=quiet) + def listen(self, description: str, use_ssl: Optional[bool] = None) -> None: endpoint = self.connections.listen(description, use_ssl) # XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases