diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 2fcf4b243..e15e8c914 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -411,13 +411,11 @@ def _get_or_create_nc_log_storage(self) -> NCLogStorage: def _get_or_create_consensus(self) -> ConsensusAlgorithm: if self._consensus is None: soft_voided_tx_ids = self._get_soft_voided_tx_ids() - pubsub = self._get_or_create_pubsub() nc_storage_factory = self._get_or_create_nc_storage_factory() nc_calls_sorter = self._get_nc_calls_sorter() self._consensus = ConsensusAlgorithm( nc_storage_factory=nc_storage_factory, soft_voided_tx_ids=soft_voided_tx_ids, - pubsub=pubsub, settings=self._get_or_create_settings(), runner_factory=self._get_or_create_runner_factory(), nc_log_storage=self._get_or_create_nc_log_storage(), @@ -657,7 +655,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler: feature_service=self._get_or_create_feature_service(), execution_manager=self._get_or_create_execution_manager(), pubsub=self._get_or_create_pubsub(), - wallet=self._get_or_create_wallet(), ) return self._vertex_handler diff --git a/hathor/consensus/consensus.py b/hathor/consensus/consensus.py index eaf1cda3b..dbf1b0fda 100644 --- a/hathor/consensus/consensus.py +++ b/hathor/consensus/consensus.py @@ -15,7 +15,8 @@ from __future__ import annotations from collections import defaultdict -from typing import TYPE_CHECKING, Callable, assert_never +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Callable, assert_never from structlog import get_logger @@ -27,7 +28,7 @@ from hathor.nanocontracts.exception import NCInvalidSignature from hathor.nanocontracts.execution import NCBlockExecutor from hathor.profiler import get_cpu_profiler -from hathor.pubsub import HathorEvents, PubSubManager +from hathor.pubsub import HathorEvents from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.exceptions import InvalidInputData, RewardLocked, TooManySigOps from hathor.util import not_none @@ -48,6 +49,12 @@ _base_transaction_log = logger.new() +@dataclass(slots=True, frozen=True, kw_only=True) +class ConsensusEvent: + event: HathorEvents + kwargs: dict[str, Any] + + class ConsensusAlgorithm: """Execute the consensus algorithm marking blocks and transactions as either executed or voided. @@ -78,7 +85,6 @@ def __init__( self, nc_storage_factory: 'NCStorageFactory', soft_voided_tx_ids: set[bytes], - pubsub: PubSubManager, *, settings: HathorSettings, tx_storage: TransactionStorage, @@ -90,7 +96,6 @@ def __init__( ) -> None: self._settings = settings self.log = logger.new() - self._pubsub = pubsub self.tx_storage = tx_storage self.nc_storage_factory = nc_storage_factory self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids) @@ -114,10 +119,10 @@ def __init__( def create_context(self) -> ConsensusAlgorithmContext: """Handy method to create a context that can be used to access block and transaction algorithms.""" - return ConsensusAlgorithmContext(self, self._pubsub) + return ConsensusAlgorithmContext(self) @cpu.profiler(key=lambda self, base: 'consensus!{}'.format(base.hash.hex())) - def unsafe_update(self, base: BaseTransaction) -> None: + def unsafe_update(self, base: BaseTransaction) -> list[ConsensusEvent]: """ Run a consensus update with its own context, indexes will be updated accordingly. @@ -177,6 +182,8 @@ def unsafe_update(self, base: BaseTransaction) -> None: with self.tx_storage.allow_invalid_context(): self._remove_transactions(txs_to_remove, context) + pubsub_events = [] + # emit the reorg started event if needed if context.reorg_info is not None: assert isinstance(old_best_block, Block) @@ -193,26 +200,28 @@ def unsafe_update(self, base: BaseTransaction) -> None: new_best_block=new_best_block.hash_hex, common_block=context.reorg_info.common_block.hash_hex, ) - context.pubsub.publish( - HathorEvents.REORG_STARTED, - old_best_height=best_height, - old_best_block=old_best_block, - new_best_height=new_best_height, - new_best_block=new_best_block, - common_block=context.reorg_info.common_block, - reorg_size=reorg_size, - ) + pubsub_events.append(ConsensusEvent( + event=HathorEvents.REORG_STARTED, + kwargs=dict( + old_best_height=best_height, + old_best_block=old_best_block, + new_best_height=new_best_height, + new_best_block=new_best_block, + common_block=context.reorg_info.common_block, + reorg_size=reorg_size, + ) + )) # finally signal an index update for all affected transactions for tx_affected in _sorted_affected_txs(context.txs_affected): self.tx_storage.indexes.update_critical_indexes(tx_affected) with non_critical_code(self.log): self.tx_storage.indexes.update_non_critical_indexes(tx_affected) - context.pubsub.publish(HathorEvents.CONSENSUS_TX_UPDATE, tx=tx_affected) + pubsub_events.append(ConsensusEvent(event=HathorEvents.CONSENSUS_TX_UPDATE, kwargs=dict(tx=tx_affected))) # signal all transactions of which the execution succeeded for tx_nc_success in context.nc_exec_success: - context.pubsub.publish(HathorEvents.NC_EXEC_SUCCESS, tx=tx_nc_success) + pubsub_events.append(ConsensusEvent(event=HathorEvents.NC_EXEC_SUCCESS, kwargs=dict(tx=tx_nc_success))) # handle custom NC events if isinstance(base, Block): @@ -220,17 +229,21 @@ def unsafe_update(self, base: BaseTransaction) -> None: for tx, events in context.nc_events: assert tx.is_nano_contract() for event in events: - context.pubsub.publish(HathorEvents.NC_EVENT, tx=tx, nc_event=event) + pubsub_events.append( + ConsensusEvent(event=HathorEvents.NC_EVENT, kwargs=dict(tx=tx, nc_event=event)) + ) else: assert context.nc_events is None # And emit events for txs that were removed for tx_removed in txs_to_remove: - context.pubsub.publish(HathorEvents.CONSENSUS_TX_REMOVED, tx=tx_removed) + pubsub_events.append(ConsensusEvent(event=HathorEvents.CONSENSUS_TX_REMOVED, kwargs=dict(tx=tx_removed))) # and also emit the reorg finished event if needed if context.reorg_info is not None: - context.pubsub.publish(HathorEvents.REORG_FINISHED) + pubsub_events.append(ConsensusEvent(event=HathorEvents.REORG_FINISHED, kwargs={})) + + return pubsub_events def filter_out_voided_by_entries_from_parents(self, tx: BaseTransaction, voided_by: set[bytes]) -> set[bytes]: """Filter out voided_by entries that should be inherited from parents.""" diff --git a/hathor/consensus/context.py b/hathor/consensus/context.py index ae9bb2f5d..83fb92e41 100644 --- a/hathor/consensus/context.py +++ b/hathor/consensus/context.py @@ -19,7 +19,6 @@ from structlog import get_logger -from hathor.pubsub import PubSubManager from hathor.transaction import BaseTransaction, Block, Transaction if TYPE_CHECKING: @@ -45,7 +44,6 @@ class ConsensusAlgorithmContext: """ __slots__ = ( 'consensus', - 'pubsub', 'block_algorithm', 'transaction_algorithm', 'txs_affected', @@ -55,7 +53,6 @@ class ConsensusAlgorithmContext: ) consensus: 'ConsensusAlgorithm' - pubsub: PubSubManager block_algorithm: 'BlockConsensusAlgorithm' transaction_algorithm: 'TransactionConsensusAlgorithm' txs_affected: set[BaseTransaction] @@ -63,9 +60,8 @@ class ConsensusAlgorithmContext: nc_events: list[tuple[Transaction, list[NCEvent]]] | None nc_exec_success: list[Transaction] - def __init__(self, consensus: 'ConsensusAlgorithm', pubsub: PubSubManager) -> None: + def __init__(self, consensus: 'ConsensusAlgorithm') -> None: self.consensus = consensus - self.pubsub = pubsub self.block_algorithm = self.consensus.block_algorithm_factory(self) self.transaction_algorithm = self.consensus.transaction_algorithm_factory(self) self.txs_affected = set() diff --git a/hathor/manager.py b/hathor/manager.py index 98f166b36..99c5c18cc 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -51,7 +51,7 @@ from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer from hathor.p2p.peer_id import PeerId -from hathor.pubsub import HathorEvents, PubSubManager +from hathor.pubsub import EventArguments, HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol as Reactor from hathor.reward_lock import is_spent_reward_locked from hathor.stratum import StratumFactory @@ -222,6 +222,7 @@ def __init__( if self.wallet: self.wallet.pubsub = self.pubsub self.wallet.reactor = self.reactor + self._subscribe_wallet(self.wallet) # It will be inject later by the builder. # XXX Remove this attribute after all dependencies are cleared. @@ -251,6 +252,14 @@ def __init__( self.lc_check_sync_state.clock = self.reactor self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL + def _subscribe_wallet(self, wallet: BaseWallet) -> None: + """Register a wallet on pubsub.""" + def handler(event: HathorEvents, args: EventArguments) -> None: + assert event == HathorEvents.NETWORK_NEW_TX_PROCESSING + wallet.on_new_tx(args.tx) + + self.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_PROCESSING, handler) + def get_default_capabilities(self) -> list[str]: """Return the default capabilities for this manager.""" default_capabilities = [ diff --git a/hathor/pubsub.py b/hathor/pubsub.py index 95f0f2bb7..a6f208818 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -36,6 +36,10 @@ class HathorEvents(Enum): """ + NETWORK_NEW_TX_PROCESSING: + Triggered when a new tx/block is received and will begin processing, just before consensus + Publishes a tx/block object + NETWORK_NEW_TX_ACCEPTED: Triggered when a new tx/block is accepted in the network Publishes a tx/block object @@ -118,6 +122,8 @@ class HathorEvents(Enum): NETWORK_PEER_DISCONNECTED = 'network:peer_disconnected' + NETWORK_NEW_TX_PROCESSING = 'network:new_tx_processing' + NETWORK_NEW_TX_ACCEPTED = 'network:new_tx_accepted' CONSENSUS_TX_UPDATE = 'consensus:tx_update' diff --git a/hathor/simulator/utils.py b/hathor/simulator/utils.py index 9afe6c464..309c467c6 100644 --- a/hathor/simulator/utils.py +++ b/hathor/simulator/utils.py @@ -53,7 +53,7 @@ def gen_new_tx(manager: HathorManager, address: str, value: int) -> Transaction: def add_new_blocks( manager: HathorManager, num_blocks: int, - advance_clock: Optional[int] = None, + advance_clock: int = 1, *, parent_block_hash: Optional[VertexId] = None, block_data: bytes = b'', @@ -85,7 +85,7 @@ def add_new_blocks( def add_new_block( manager: HathorManager, - advance_clock: Optional[int] = None, + advance_clock: int = 1, *, parent_block_hash: Optional[VertexId] = None, data: bytes = b'', diff --git a/hathor/vertex_handler/vertex_handler.py b/hathor/vertex_handler/vertex_handler.py index 9a066bef2..1435f1f1f 100644 --- a/hathor/vertex_handler/vertex_handler.py +++ b/hathor/vertex_handler/vertex_handler.py @@ -23,6 +23,7 @@ from hathor.conf.settings import HathorSettings from hathor.consensus import ConsensusAlgorithm +from hathor.consensus.consensus import ConsensusEvent from hathor.exception import HathorError, InvalidNewTransaction from hathor.execution_manager import ExecutionManager, non_critical_code from hathor.feature_activation.feature_service import FeatureService @@ -36,7 +37,6 @@ from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.verification.verification_params import VerificationParams from hathor.verification.verification_service import VerificationService -from hathor.wallet import BaseWallet logger = get_logger() cpu = get_cpu_profiler() @@ -53,7 +53,6 @@ class VertexHandler: '_feature_service', '_pubsub', '_execution_manager', - '_wallet', '_log_vertex_bytes', ) @@ -68,7 +67,6 @@ def __init__( feature_service: FeatureService, pubsub: PubSubManager, execution_manager: ExecutionManager, - wallet: BaseWallet | None, log_vertex_bytes: bool = False, ) -> None: self._log = logger.new() @@ -80,7 +78,6 @@ def __init__( self._feature_service = feature_service self._pubsub = pubsub self._execution_manager = execution_manager - self._wallet = wallet self._log_vertex_bytes = log_vertex_bytes @cpu.profiler('on_new_block') @@ -176,8 +173,8 @@ def _old_on_new_vertex( return False try: - self._unsafe_save_and_run_consensus(vertex) - self._post_consensus(vertex, params, quiet=quiet) + consensus_events = self._unsafe_save_and_run_consensus(vertex) + self._post_consensus(vertex, params, consensus_events, quiet=quiet) except BaseException: self._log.error('unexpected exception in on_new_vertex()', vertex=vertex) meta = vertex.get_metadata() @@ -219,7 +216,7 @@ def _validate_vertex(self, vertex: BaseTransaction, params: VerificationParams) return True - def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None: + def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> list[ConsensusEvent]: """ This method is considered unsafe because the caller is responsible for crashing the full node if this method throws any exception. @@ -232,12 +229,13 @@ def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None: self._tx_storage.save_transaction(vertex) with non_critical_code(self._log): self._tx_storage.indexes.add_to_non_critical_indexes(vertex) - self._consensus.unsafe_update(vertex) + return self._consensus.unsafe_update(vertex) def _post_consensus( self, vertex: BaseTransaction, params: VerificationParams, + consensus_events: list[ConsensusEvent], *, quiet: bool, ) -> None: @@ -258,13 +256,11 @@ def _post_consensus( with non_critical_code(self._log): self._tx_storage.indexes.update_non_critical_indexes(vertex) - # Publish to pubsub manager the new tx accepted, now that it's full validated + self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_PROCESSING, tx=vertex) + for event in consensus_events: + self._pubsub.publish(event.event, **event.kwargs) self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=vertex) - if self._wallet: - # TODO Remove it and use pubsub instead. - self._wallet.on_new_tx(vertex) - self._log_new_object(vertex, 'new {}', quiet=quiet) def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None: diff --git a/hathor_cli/builder.py b/hathor_cli/builder.py index a30a73c53..e0bb6de07 100644 --- a/hathor_cli/builder.py +++ b/hathor_cli/builder.py @@ -249,7 +249,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: consensus_algorithm = ConsensusAlgorithm( self.nc_storage_factory, soft_voided_tx_ids, - pubsub=pubsub, settings=settings, runner_factory=runner_factory, nc_log_storage=nc_log_storage, @@ -317,7 +316,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: feature_service=self.feature_service, pubsub=pubsub, execution_manager=execution_manager, - wallet=self.wallet, log_vertex_bytes=self._args.log_vertex_bytes, ) diff --git a/hathor_tests/consensus/test_soft_voided.py b/hathor_tests/consensus/test_soft_voided.py index 087ac9037..4317c2203 100644 --- a/hathor_tests/consensus/test_soft_voided.py +++ b/hathor_tests/consensus/test_soft_voided.py @@ -86,6 +86,7 @@ def _run_test( assert manager2.wallet is not None address = manager2.wallet.get_unused_address(mark_as_used=False) value = 1 + simulator.run_to_completion() txC = gen_new_tx(manager2, address, value) txC.parents[0] = txA.hash txC.timestamp = max(txC.timestamp, txA.timestamp + 1) diff --git a/hathor_tests/resources/wallet/test_balance.py b/hathor_tests/resources/wallet/test_balance.py index 69740fedf..40bd9725c 100644 --- a/hathor_tests/resources/wallet/test_balance.py +++ b/hathor_tests/resources/wallet/test_balance.py @@ -30,6 +30,7 @@ def test_get(self): cpu_mining_service=CpuMiningService() ) yield self.web_mining.post("mining", {'block_bytes': base64.b64encode(block_bytes).decode('utf-8')}) + self.clock.advance(1) # Get new balance after block response2 = yield self.web.get("wallet/balance") diff --git a/hathor_tests/resources/wallet/test_history.py b/hathor_tests/resources/wallet/test_history.py index d76dee4b3..07673945c 100644 --- a/hathor_tests/resources/wallet/test_history.py +++ b/hathor_tests/resources/wallet/test_history.py @@ -25,6 +25,7 @@ def test_get(self): cpu_mining_service=CpuMiningService() ) yield self.web_mining.post("mining", {'block_bytes': base64.b64encode(block_bytes).decode('utf-8')}) + self.clock.advance(1) # Getting wallet history response = yield self.web.get("wallet/history", {b'page': 1, b'count': 10}) diff --git a/hathor_tests/tx/test_indexes.py b/hathor_tests/tx/test_indexes.py index b93cafd33..0173bd447 100644 --- a/hathor_tests/tx/test_indexes.py +++ b/hathor_tests/tx/test_indexes.py @@ -250,7 +250,7 @@ def check_utxos(*args): block2 = self.manager.generate_mining_block(parent_block_hash=block1.parents[0], address=decode_address(address)) block2.parents[1:] = [txA2.hash, txB2.hash] - block2.timestamp = block1.timestamp + block2.timestamp = block1.timestamp + 1 block2.weight = 4 self.manager.cpu_mining_service.resolve(block2) self.manager.propagate_tx(block2) diff --git a/hathor_tests/tx/test_indexes4.py b/hathor_tests/tx/test_indexes4.py index 1e8383753..7d0137214 100644 --- a/hathor_tests/tx/test_indexes4.py +++ b/hathor_tests/tx/test_indexes4.py @@ -49,6 +49,7 @@ def _build_randomized_blockchain(self, *, utxo_index=False): value = 500 tx = gen_new_tx(manager, address, value) assert manager.propagate_tx(tx) + self.clock.advance(1) return manager def test_index_initialization(self): diff --git a/hathor_tests/tx/test_multisig.py b/hathor_tests/tx/test_multisig.py index 48b35650b..bff867eaf 100644 --- a/hathor_tests/tx/test_multisig.py +++ b/hathor_tests/tx/test_multisig.py @@ -124,6 +124,7 @@ def test_spend_multisig(self): # Now we propagate the correct self.assertTrue(self.manager.propagate_tx(tx)) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(0, first_block_amount + 300)) diff --git a/hathor_tests/tx/test_reward_lock.py b/hathor_tests/tx/test_reward_lock.py index 838e1a551..55f652771 100644 --- a/hathor_tests/tx/test_reward_lock.py +++ b/hathor_tests/tx/test_reward_lock.py @@ -165,6 +165,7 @@ def test_mempool_tx_invalid_after_reorg(self) -> None: assert tx_address not in balance_per_address self.assertEqual(tx.static_metadata.min_height, unlock_height) self.assertTrue(self.manager.on_new_tx(tx)) + self.clock.advance(1) balance_per_address = self.manager.wallet.get_balance_per_address(self._settings.HATHOR_TOKEN_UID) assert balance_per_address[tx_address] == 6400 diff --git a/hathor_tests/tx/test_timelock.py b/hathor_tests/tx/test_timelock.py index 82eca54c5..faf9e8d99 100644 --- a/hathor_tests/tx/test_timelock.py +++ b/hathor_tests/tx/test_timelock.py @@ -40,6 +40,7 @@ def test_timelock(self): tx1.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx1) self.manager.propagate_tx(tx1) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(500, sum(blocks_tokens) - 500)) @@ -79,6 +80,7 @@ def test_timelock(self): tx3.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx3) propagated = self.manager.propagate_tx(tx3) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(500, sum(blocks_tokens) - 500 - 700)) self.assertTrue(propagated) @@ -99,6 +101,7 @@ def test_timelock(self): tx4.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx4) propagated = self.manager.propagate_tx(tx4) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(500, sum(blocks_tokens[:3]))) self.assertTrue(propagated) @@ -107,6 +110,7 @@ def test_timelock(self): tx2.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx2) propagated = self.manager.propagate_tx(tx2) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(0, sum(blocks_tokens[:3]))) self.assertTrue(propagated) diff --git a/hathor_tests/utils.py b/hathor_tests/utils.py index 5ddc167c9..4866987ae 100644 --- a/hathor_tests/utils.py +++ b/hathor_tests/utils.py @@ -181,7 +181,7 @@ def add_new_tx( manager: HathorManager, address: str, value: int, - advance_clock: int | None = None, + advance_clock: int = 1, propagate: bool = True, name: str | None = None, ) -> Transaction: @@ -211,7 +211,7 @@ def add_new_tx( def add_new_transactions( manager: HathorManager, num_txs: int, - advance_clock: int | None = None, + advance_clock: int = 1, propagate: bool = True, name: str | None = None, ) -> list[Transaction]: