From e6bf76e8a91571afcaaf4db6cf0e356d5e17eb3b Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 22 Jan 2026 10:28:48 -0300 Subject: [PATCH 1/4] refactor: wallet on_new_tx --- hathor/pubsub.py | 6 +++++ hathor/vertex_handler/vertex_handler.py | 22 +++++++++++++------ hathor_cli/events_simulator/scenario.py | 2 +- hathor_tests/consensus/test_soft_voided.py | 1 + hathor_tests/p2p/test_double_spending.py | 2 +- .../transaction/test_block_at_height.py | 8 +++---- .../test_transaction_confirmation.py | 2 +- hathor_tests/resources/transaction/test_tx.py | 6 +++-- hathor_tests/resources/wallet/test_balance.py | 1 + hathor_tests/resources/wallet/test_history.py | 1 + hathor_tests/tx/test_indexes.py | 2 +- hathor_tests/tx/test_indexes4.py | 1 + hathor_tests/tx/test_multisig.py | 1 + hathor_tests/tx/test_reward_lock.py | 1 + hathor_tests/tx/test_timelock.py | 4 ++++ hathor_tests/tx/test_traversal.py | 4 ++-- hathor_tests/wallet/test_wallet_hd.py | 2 +- 17 files changed, 46 insertions(+), 20 deletions(-) diff --git a/hathor/pubsub.py b/hathor/pubsub.py index 95f0f2bb7..a6f208818 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -36,6 +36,10 @@ class HathorEvents(Enum): """ + NETWORK_NEW_TX_PROCESSING: + Triggered when a new tx/block is received and will begin processing, just before consensus + Publishes a tx/block object + NETWORK_NEW_TX_ACCEPTED: Triggered when a new tx/block is accepted in the network Publishes a tx/block object @@ -118,6 +122,8 @@ class HathorEvents(Enum): NETWORK_PEER_DISCONNECTED = 'network:peer_disconnected' + NETWORK_NEW_TX_PROCESSING = 'network:new_tx_processing' + NETWORK_NEW_TX_ACCEPTED = 'network:new_tx_accepted' CONSENSUS_TX_UPDATE = 'consensus:tx_update' diff --git a/hathor/vertex_handler/vertex_handler.py b/hathor/vertex_handler/vertex_handler.py index 9a066bef2..6705edb1d 100644 --- a/hathor/vertex_handler/vertex_handler.py +++ b/hathor/vertex_handler/vertex_handler.py @@ -28,7 +28,7 @@ from hathor.feature_activation.feature_service import FeatureService from hathor.feature_activation.utils import Features from hathor.profiler import get_cpu_profiler -from hathor.pubsub import HathorEvents, PubSubManager +from hathor.pubsub import EventArguments, HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.scripts.opcode import OpcodesVersion @@ -53,7 +53,6 @@ class VertexHandler: '_feature_service', '_pubsub', '_execution_manager', - '_wallet', '_log_vertex_bytes', ) @@ -80,9 +79,21 @@ def __init__( self._feature_service = feature_service self._pubsub = pubsub self._execution_manager = execution_manager - self._wallet = wallet self._log_vertex_bytes = log_vertex_bytes + self._register_wallet(wallet) + + def _register_wallet(self, wallet: BaseWallet | None) -> None: + """Register a wallet on pubsub.""" + if wallet is None: + return + + def handler(event: HathorEvents, args: EventArguments) -> None: + assert event == HathorEvents.NETWORK_NEW_TX_PROCESSING + wallet.on_new_tx(args.tx) + + self._pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_PROCESSING, handler) + @cpu.profiler('on_new_block') @inlineCallbacks def on_new_block(self, block: Block, *, deps: list[Transaction]) -> Generator[Any, Any, bool]: @@ -232,6 +243,7 @@ def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None: self._tx_storage.save_transaction(vertex) with non_critical_code(self._log): self._tx_storage.indexes.add_to_non_critical_indexes(vertex) + self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_PROCESSING, tx=vertex) self._consensus.unsafe_update(vertex) def _post_consensus( @@ -261,10 +273,6 @@ def _post_consensus( # Publish to pubsub manager the new tx accepted, now that it's full validated self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=vertex) - if self._wallet: - # TODO Remove it and use pubsub instead. - self._wallet.on_new_tx(vertex) - self._log_new_object(vertex, 'new {}', quiet=quiet) def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None: diff --git a/hathor_cli/events_simulator/scenario.py b/hathor_cli/events_simulator/scenario.py index 600ed217f..340af4427 100644 --- a/hathor_cli/events_simulator/scenario.py +++ b/hathor_cli/events_simulator/scenario.py @@ -177,7 +177,7 @@ def simulate_invalid_mempool_transaction(simulator: 'Simulator', manager: 'Hatho assert manager.wallet is not None address = manager.wallet.get_unused_address(mark_as_used=False) - blocks = add_new_blocks(manager, settings.REWARD_SPEND_MIN_BLOCKS + 1) + blocks = add_new_blocks(manager, settings.REWARD_SPEND_MIN_BLOCKS + 1, advance_clock=1) simulator.run(60) balance_per_address = manager.wallet.get_balance_per_address(settings.HATHOR_TOKEN_UID) diff --git a/hathor_tests/consensus/test_soft_voided.py b/hathor_tests/consensus/test_soft_voided.py index 087ac9037..4317c2203 100644 --- a/hathor_tests/consensus/test_soft_voided.py +++ b/hathor_tests/consensus/test_soft_voided.py @@ -86,6 +86,7 @@ def _run_test( assert manager2.wallet is not None address = manager2.wallet.get_unused_address(mark_as_used=False) value = 1 + simulator.run_to_completion() txC = gen_new_tx(manager2, address, value) txC.parents[0] = txA.hash txC.timestamp = max(txC.timestamp, txA.timestamp + 1) diff --git a/hathor_tests/p2p/test_double_spending.py b/hathor_tests/p2p/test_double_spending.py index a61d01a2d..7e689873a 100644 --- a/hathor_tests/p2p/test_double_spending.py +++ b/hathor_tests/p2p/test_double_spending.py @@ -24,7 +24,7 @@ def _add_new_transactions(self, manager: HathorManager, num_txs: int) -> list[Tr for _ in range(num_txs): address = not_none(self.get_address(0)) value = self.rng.choice([5, 10, 15, 20]) - tx = add_new_tx(manager, address, value) + tx = add_new_tx(manager, address, value, advance_clock=1) txs.append(tx) return txs diff --git a/hathor_tests/resources/transaction/test_block_at_height.py b/hathor_tests/resources/transaction/test_block_at_height.py index 7a19967ba..172a474c2 100644 --- a/hathor_tests/resources/transaction/test_block_at_height.py +++ b/hathor_tests/resources/transaction/test_block_at_height.py @@ -20,14 +20,14 @@ def test_include_full(self): confirmed_tx_list = [] for _ in range(15): - confirmed_tx_list.append(add_new_tx(self.manager, address, 1)) + confirmed_tx_list.append(add_new_tx(self.manager, address, 1, advance_clock=1)) block = add_new_block(self.manager, advance_clock=1) height = block.get_height() # non-confirmed transactions for _ in range(15): - add_new_tx(self.manager, address, 1) + add_new_tx(self.manager, address, 1, advance_clock=1) response = yield self.web.get("block_at_height", { b'height': str(height).encode('ascii'), @@ -48,14 +48,14 @@ def test_include_txids(self): confirmed_tx_list = [] for _ in range(15): - confirmed_tx_list.append(add_new_tx(self.manager, address, 1)) + confirmed_tx_list.append(add_new_tx(self.manager, address, 1, advance_clock=1)) block = add_new_block(self.manager, advance_clock=1) height = block.get_height() # non-confirmed transactions for _ in range(15): - add_new_tx(self.manager, address, 1) + add_new_tx(self.manager, address, 1, advance_clock=1) response = yield self.web.get("block_at_height", { b'height': str(height).encode('ascii'), diff --git a/hathor_tests/resources/transaction/test_transaction_confirmation.py b/hathor_tests/resources/transaction/test_transaction_confirmation.py index 178f9a939..15686981f 100644 --- a/hathor_tests/resources/transaction/test_transaction_confirmation.py +++ b/hathor_tests/resources/transaction/test_transaction_confirmation.py @@ -27,7 +27,7 @@ def test_get_data(self): # Adding blocks to have funds add_new_blocks(self.manager, 2, advance_clock=1) add_blocks_unlock_reward(self.manager) - tx = add_new_transactions(self.manager, 5)[0] + tx = add_new_transactions(self.manager, 5, advance_clock=1)[0] add_new_blocks(self.manager, 2, advance_clock=1) add_blocks_unlock_reward(self.manager) response_success2 = yield self.web.get( diff --git a/hathor_tests/resources/transaction/test_tx.py b/hathor_tests/resources/transaction/test_tx.py index 3a461e1bd..1adffa449 100644 --- a/hathor_tests/resources/transaction/test_tx.py +++ b/hathor_tests/resources/transaction/test_tx.py @@ -292,7 +292,9 @@ def test_get_many(self): # Add some blocks and txs and get them in timestamp order blocks = add_new_blocks(self.manager, 4, advance_clock=1) _blocks = add_blocks_unlock_reward(self.manager) - txs = sorted(add_new_transactions(self.manager, 25), key=lambda x: (x.timestamp, x.hash)) + txs = sorted( + add_new_transactions(self.manager, 25, advance_clock=1), key=lambda x: (x.timestamp, x.hash) + ) blocks.extend(_blocks) blocks = sorted(blocks, key=lambda x: (x.timestamp, x.hash)) @@ -484,7 +486,7 @@ def test_negative_timestamp(self): # Add some blocks and txs and get them in timestamp order blocks = add_new_blocks(self.manager, 4, advance_clock=1) add_blocks_unlock_reward(self.manager) - add_new_transactions(self.manager, 25) + add_new_transactions(self.manager, 25, advance_clock=1) response = yield self.web.get( "transaction", { diff --git a/hathor_tests/resources/wallet/test_balance.py b/hathor_tests/resources/wallet/test_balance.py index 69740fedf..40bd9725c 100644 --- a/hathor_tests/resources/wallet/test_balance.py +++ b/hathor_tests/resources/wallet/test_balance.py @@ -30,6 +30,7 @@ def test_get(self): cpu_mining_service=CpuMiningService() ) yield self.web_mining.post("mining", {'block_bytes': base64.b64encode(block_bytes).decode('utf-8')}) + self.clock.advance(1) # Get new balance after block response2 = yield self.web.get("wallet/balance") diff --git a/hathor_tests/resources/wallet/test_history.py b/hathor_tests/resources/wallet/test_history.py index d76dee4b3..07673945c 100644 --- a/hathor_tests/resources/wallet/test_history.py +++ b/hathor_tests/resources/wallet/test_history.py @@ -25,6 +25,7 @@ def test_get(self): cpu_mining_service=CpuMiningService() ) yield self.web_mining.post("mining", {'block_bytes': base64.b64encode(block_bytes).decode('utf-8')}) + self.clock.advance(1) # Getting wallet history response = yield self.web.get("wallet/history", {b'page': 1, b'count': 10}) diff --git a/hathor_tests/tx/test_indexes.py b/hathor_tests/tx/test_indexes.py index b93cafd33..abed13b06 100644 --- a/hathor_tests/tx/test_indexes.py +++ b/hathor_tests/tx/test_indexes.py @@ -362,7 +362,7 @@ def test_utxo_index_limits(self): txs = [] values = list(range(1, 301)) for value in values: - txs.append(add_new_tx(self.manager, address, value)) + txs.append(add_new_tx(self.manager, address, value, advance_clock=1)) assert len(txs) == len(values) txs_and_values = list(zip(txs, values)) diff --git a/hathor_tests/tx/test_indexes4.py b/hathor_tests/tx/test_indexes4.py index 1e8383753..7d0137214 100644 --- a/hathor_tests/tx/test_indexes4.py +++ b/hathor_tests/tx/test_indexes4.py @@ -49,6 +49,7 @@ def _build_randomized_blockchain(self, *, utxo_index=False): value = 500 tx = gen_new_tx(manager, address, value) assert manager.propagate_tx(tx) + self.clock.advance(1) return manager def test_index_initialization(self): diff --git a/hathor_tests/tx/test_multisig.py b/hathor_tests/tx/test_multisig.py index 48b35650b..bff867eaf 100644 --- a/hathor_tests/tx/test_multisig.py +++ b/hathor_tests/tx/test_multisig.py @@ -124,6 +124,7 @@ def test_spend_multisig(self): # Now we propagate the correct self.assertTrue(self.manager.propagate_tx(tx)) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(0, first_block_amount + 300)) diff --git a/hathor_tests/tx/test_reward_lock.py b/hathor_tests/tx/test_reward_lock.py index 838e1a551..55f652771 100644 --- a/hathor_tests/tx/test_reward_lock.py +++ b/hathor_tests/tx/test_reward_lock.py @@ -165,6 +165,7 @@ def test_mempool_tx_invalid_after_reorg(self) -> None: assert tx_address not in balance_per_address self.assertEqual(tx.static_metadata.min_height, unlock_height) self.assertTrue(self.manager.on_new_tx(tx)) + self.clock.advance(1) balance_per_address = self.manager.wallet.get_balance_per_address(self._settings.HATHOR_TOKEN_UID) assert balance_per_address[tx_address] == 6400 diff --git a/hathor_tests/tx/test_timelock.py b/hathor_tests/tx/test_timelock.py index 82eca54c5..faf9e8d99 100644 --- a/hathor_tests/tx/test_timelock.py +++ b/hathor_tests/tx/test_timelock.py @@ -40,6 +40,7 @@ def test_timelock(self): tx1.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx1) self.manager.propagate_tx(tx1) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(500, sum(blocks_tokens) - 500)) @@ -79,6 +80,7 @@ def test_timelock(self): tx3.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx3) propagated = self.manager.propagate_tx(tx3) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(500, sum(blocks_tokens) - 500 - 700)) self.assertTrue(propagated) @@ -99,6 +101,7 @@ def test_timelock(self): tx4.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx4) propagated = self.manager.propagate_tx(tx4) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(500, sum(blocks_tokens[:3]))) self.assertTrue(propagated) @@ -107,6 +110,7 @@ def test_timelock(self): tx2.timestamp = int(self.clock.seconds()) self.manager.cpu_mining_service.resolve(tx2) propagated = self.manager.propagate_tx(tx2) + self.clock.advance(1) self.assertEqual(self.manager.wallet.balance[self._settings.HATHOR_TOKEN_UID], WalletBalance(0, sum(blocks_tokens[:3]))) self.assertTrue(propagated) diff --git a/hathor_tests/tx/test_traversal.py b/hathor_tests/tx/test_traversal.py index d8a538e78..eaa4776e2 100644 --- a/hathor_tests/tx/test_traversal.py +++ b/hathor_tests/tx/test_traversal.py @@ -20,7 +20,7 @@ def setUp(self): self.blocks_before = add_new_blocks(self.manager, 3, advance_clock=1) self.blocks_before.extend(add_blocks_unlock_reward(self.manager)) - self.txs_before = add_new_transactions(self.manager, 5) + self.txs_before = add_new_transactions(self.manager, 5, advance_clock=1) for block in self.blocks_before: self.hashes_before.add(block.hash) for tx in self.txs_before: @@ -30,7 +30,7 @@ def setUp(self): self.root_tx = add_new_tx(self.manager, address=address, value=100) self.blocks_after = add_blocks_unlock_reward(self.manager) - self.txs_after = add_new_transactions(self.manager, 5) + self.txs_after = add_new_transactions(self.manager, 5, advance_clock=1) self.blocks_after.extend(add_new_blocks(self.manager, 3, advance_clock=1)) self.hashes_after = set() diff --git a/hathor_tests/wallet/test_wallet_hd.py b/hathor_tests/wallet/test_wallet_hd.py index 60dc0d104..c54a80409 100644 --- a/hathor_tests/wallet/test_wallet_hd.py +++ b/hathor_tests/wallet/test_wallet_hd.py @@ -29,7 +29,7 @@ def test_transaction_and_balance(self): # generate a new block and check if we increase balance new_address = self.wallet.get_unused_address() out = WalletOutputInfo(decode_address(new_address), self.TOKENS, timelock=None) - block = add_new_block(self.manager) + block = add_new_block(self.manager, advance_clock=1) self.manager.verification_service.verify(block, self.get_verification_params(self.manager)) utxo = self.wallet.unspent_txs[self._settings.HATHOR_TOKEN_UID].get((block.hash, 0)) self.assertIsNotNone(utxo) From e16e128be166afdeb0f4db6352569b01ffdb4720 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 26 Jan 2026 12:30:45 -0300 Subject: [PATCH 2/4] review changes --- hathor/builder/builder.py | 3 -- hathor/consensus/consensus.py | 53 ++++++++++++------- hathor/consensus/context.py | 6 +-- hathor/manager.py | 11 +++- hathor/simulator/utils.py | 2 +- hathor/vertex_handler/vertex_handler.py | 32 ++++------- hathor_cli/builder.py | 2 - hathor_cli/events_simulator/scenario.py | 2 +- hathor_tests/p2p/test_double_spending.py | 2 +- .../transaction/test_block_at_height.py | 8 +-- .../test_transaction_confirmation.py | 2 +- hathor_tests/resources/transaction/test_tx.py | 6 +-- hathor_tests/tx/test_indexes.py | 2 +- hathor_tests/tx/test_traversal.py | 4 +- hathor_tests/utils.py | 4 +- hathor_tests/wallet/test_wallet_hd.py | 2 +- 16 files changed, 70 insertions(+), 71 deletions(-) diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 2fcf4b243..e15e8c914 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -411,13 +411,11 @@ def _get_or_create_nc_log_storage(self) -> NCLogStorage: def _get_or_create_consensus(self) -> ConsensusAlgorithm: if self._consensus is None: soft_voided_tx_ids = self._get_soft_voided_tx_ids() - pubsub = self._get_or_create_pubsub() nc_storage_factory = self._get_or_create_nc_storage_factory() nc_calls_sorter = self._get_nc_calls_sorter() self._consensus = ConsensusAlgorithm( nc_storage_factory=nc_storage_factory, soft_voided_tx_ids=soft_voided_tx_ids, - pubsub=pubsub, settings=self._get_or_create_settings(), runner_factory=self._get_or_create_runner_factory(), nc_log_storage=self._get_or_create_nc_log_storage(), @@ -657,7 +655,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler: feature_service=self._get_or_create_feature_service(), execution_manager=self._get_or_create_execution_manager(), pubsub=self._get_or_create_pubsub(), - wallet=self._get_or_create_wallet(), ) return self._vertex_handler diff --git a/hathor/consensus/consensus.py b/hathor/consensus/consensus.py index eaf1cda3b..dbf1b0fda 100644 --- a/hathor/consensus/consensus.py +++ b/hathor/consensus/consensus.py @@ -15,7 +15,8 @@ from __future__ import annotations from collections import defaultdict -from typing import TYPE_CHECKING, Callable, assert_never +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Callable, assert_never from structlog import get_logger @@ -27,7 +28,7 @@ from hathor.nanocontracts.exception import NCInvalidSignature from hathor.nanocontracts.execution import NCBlockExecutor from hathor.profiler import get_cpu_profiler -from hathor.pubsub import HathorEvents, PubSubManager +from hathor.pubsub import HathorEvents from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.exceptions import InvalidInputData, RewardLocked, TooManySigOps from hathor.util import not_none @@ -48,6 +49,12 @@ _base_transaction_log = logger.new() +@dataclass(slots=True, frozen=True, kw_only=True) +class ConsensusEvent: + event: HathorEvents + kwargs: dict[str, Any] + + class ConsensusAlgorithm: """Execute the consensus algorithm marking blocks and transactions as either executed or voided. @@ -78,7 +85,6 @@ def __init__( self, nc_storage_factory: 'NCStorageFactory', soft_voided_tx_ids: set[bytes], - pubsub: PubSubManager, *, settings: HathorSettings, tx_storage: TransactionStorage, @@ -90,7 +96,6 @@ def __init__( ) -> None: self._settings = settings self.log = logger.new() - self._pubsub = pubsub self.tx_storage = tx_storage self.nc_storage_factory = nc_storage_factory self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids) @@ -114,10 +119,10 @@ def __init__( def create_context(self) -> ConsensusAlgorithmContext: """Handy method to create a context that can be used to access block and transaction algorithms.""" - return ConsensusAlgorithmContext(self, self._pubsub) + return ConsensusAlgorithmContext(self) @cpu.profiler(key=lambda self, base: 'consensus!{}'.format(base.hash.hex())) - def unsafe_update(self, base: BaseTransaction) -> None: + def unsafe_update(self, base: BaseTransaction) -> list[ConsensusEvent]: """ Run a consensus update with its own context, indexes will be updated accordingly. @@ -177,6 +182,8 @@ def unsafe_update(self, base: BaseTransaction) -> None: with self.tx_storage.allow_invalid_context(): self._remove_transactions(txs_to_remove, context) + pubsub_events = [] + # emit the reorg started event if needed if context.reorg_info is not None: assert isinstance(old_best_block, Block) @@ -193,26 +200,28 @@ def unsafe_update(self, base: BaseTransaction) -> None: new_best_block=new_best_block.hash_hex, common_block=context.reorg_info.common_block.hash_hex, ) - context.pubsub.publish( - HathorEvents.REORG_STARTED, - old_best_height=best_height, - old_best_block=old_best_block, - new_best_height=new_best_height, - new_best_block=new_best_block, - common_block=context.reorg_info.common_block, - reorg_size=reorg_size, - ) + pubsub_events.append(ConsensusEvent( + event=HathorEvents.REORG_STARTED, + kwargs=dict( + old_best_height=best_height, + old_best_block=old_best_block, + new_best_height=new_best_height, + new_best_block=new_best_block, + common_block=context.reorg_info.common_block, + reorg_size=reorg_size, + ) + )) # finally signal an index update for all affected transactions for tx_affected in _sorted_affected_txs(context.txs_affected): self.tx_storage.indexes.update_critical_indexes(tx_affected) with non_critical_code(self.log): self.tx_storage.indexes.update_non_critical_indexes(tx_affected) - context.pubsub.publish(HathorEvents.CONSENSUS_TX_UPDATE, tx=tx_affected) + pubsub_events.append(ConsensusEvent(event=HathorEvents.CONSENSUS_TX_UPDATE, kwargs=dict(tx=tx_affected))) # signal all transactions of which the execution succeeded for tx_nc_success in context.nc_exec_success: - context.pubsub.publish(HathorEvents.NC_EXEC_SUCCESS, tx=tx_nc_success) + pubsub_events.append(ConsensusEvent(event=HathorEvents.NC_EXEC_SUCCESS, kwargs=dict(tx=tx_nc_success))) # handle custom NC events if isinstance(base, Block): @@ -220,17 +229,21 @@ def unsafe_update(self, base: BaseTransaction) -> None: for tx, events in context.nc_events: assert tx.is_nano_contract() for event in events: - context.pubsub.publish(HathorEvents.NC_EVENT, tx=tx, nc_event=event) + pubsub_events.append( + ConsensusEvent(event=HathorEvents.NC_EVENT, kwargs=dict(tx=tx, nc_event=event)) + ) else: assert context.nc_events is None # And emit events for txs that were removed for tx_removed in txs_to_remove: - context.pubsub.publish(HathorEvents.CONSENSUS_TX_REMOVED, tx=tx_removed) + pubsub_events.append(ConsensusEvent(event=HathorEvents.CONSENSUS_TX_REMOVED, kwargs=dict(tx=tx_removed))) # and also emit the reorg finished event if needed if context.reorg_info is not None: - context.pubsub.publish(HathorEvents.REORG_FINISHED) + pubsub_events.append(ConsensusEvent(event=HathorEvents.REORG_FINISHED, kwargs={})) + + return pubsub_events def filter_out_voided_by_entries_from_parents(self, tx: BaseTransaction, voided_by: set[bytes]) -> set[bytes]: """Filter out voided_by entries that should be inherited from parents.""" diff --git a/hathor/consensus/context.py b/hathor/consensus/context.py index ae9bb2f5d..83fb92e41 100644 --- a/hathor/consensus/context.py +++ b/hathor/consensus/context.py @@ -19,7 +19,6 @@ from structlog import get_logger -from hathor.pubsub import PubSubManager from hathor.transaction import BaseTransaction, Block, Transaction if TYPE_CHECKING: @@ -45,7 +44,6 @@ class ConsensusAlgorithmContext: """ __slots__ = ( 'consensus', - 'pubsub', 'block_algorithm', 'transaction_algorithm', 'txs_affected', @@ -55,7 +53,6 @@ class ConsensusAlgorithmContext: ) consensus: 'ConsensusAlgorithm' - pubsub: PubSubManager block_algorithm: 'BlockConsensusAlgorithm' transaction_algorithm: 'TransactionConsensusAlgorithm' txs_affected: set[BaseTransaction] @@ -63,9 +60,8 @@ class ConsensusAlgorithmContext: nc_events: list[tuple[Transaction, list[NCEvent]]] | None nc_exec_success: list[Transaction] - def __init__(self, consensus: 'ConsensusAlgorithm', pubsub: PubSubManager) -> None: + def __init__(self, consensus: 'ConsensusAlgorithm') -> None: self.consensus = consensus - self.pubsub = pubsub self.block_algorithm = self.consensus.block_algorithm_factory(self) self.transaction_algorithm = self.consensus.transaction_algorithm_factory(self) self.txs_affected = set() diff --git a/hathor/manager.py b/hathor/manager.py index 98f166b36..99c5c18cc 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -51,7 +51,7 @@ from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer from hathor.p2p.peer_id import PeerId -from hathor.pubsub import HathorEvents, PubSubManager +from hathor.pubsub import EventArguments, HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol as Reactor from hathor.reward_lock import is_spent_reward_locked from hathor.stratum import StratumFactory @@ -222,6 +222,7 @@ def __init__( if self.wallet: self.wallet.pubsub = self.pubsub self.wallet.reactor = self.reactor + self._subscribe_wallet(self.wallet) # It will be inject later by the builder. # XXX Remove this attribute after all dependencies are cleared. @@ -251,6 +252,14 @@ def __init__( self.lc_check_sync_state.clock = self.reactor self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL + def _subscribe_wallet(self, wallet: BaseWallet) -> None: + """Register a wallet on pubsub.""" + def handler(event: HathorEvents, args: EventArguments) -> None: + assert event == HathorEvents.NETWORK_NEW_TX_PROCESSING + wallet.on_new_tx(args.tx) + + self.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_PROCESSING, handler) + def get_default_capabilities(self) -> list[str]: """Return the default capabilities for this manager.""" default_capabilities = [ diff --git a/hathor/simulator/utils.py b/hathor/simulator/utils.py index 9afe6c464..11ce939a5 100644 --- a/hathor/simulator/utils.py +++ b/hathor/simulator/utils.py @@ -53,7 +53,7 @@ def gen_new_tx(manager: HathorManager, address: str, value: int) -> Transaction: def add_new_blocks( manager: HathorManager, num_blocks: int, - advance_clock: Optional[int] = None, + advance_clock: int = 1, *, parent_block_hash: Optional[VertexId] = None, block_data: bytes = b'', diff --git a/hathor/vertex_handler/vertex_handler.py b/hathor/vertex_handler/vertex_handler.py index 6705edb1d..1435f1f1f 100644 --- a/hathor/vertex_handler/vertex_handler.py +++ b/hathor/vertex_handler/vertex_handler.py @@ -23,12 +23,13 @@ from hathor.conf.settings import HathorSettings from hathor.consensus import ConsensusAlgorithm +from hathor.consensus.consensus import ConsensusEvent from hathor.exception import HathorError, InvalidNewTransaction from hathor.execution_manager import ExecutionManager, non_critical_code from hathor.feature_activation.feature_service import FeatureService from hathor.feature_activation.utils import Features from hathor.profiler import get_cpu_profiler -from hathor.pubsub import EventArguments, HathorEvents, PubSubManager +from hathor.pubsub import HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.scripts.opcode import OpcodesVersion @@ -36,7 +37,6 @@ from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.verification.verification_params import VerificationParams from hathor.verification.verification_service import VerificationService -from hathor.wallet import BaseWallet logger = get_logger() cpu = get_cpu_profiler() @@ -67,7 +67,6 @@ def __init__( feature_service: FeatureService, pubsub: PubSubManager, execution_manager: ExecutionManager, - wallet: BaseWallet | None, log_vertex_bytes: bool = False, ) -> None: self._log = logger.new() @@ -81,19 +80,6 @@ def __init__( self._execution_manager = execution_manager self._log_vertex_bytes = log_vertex_bytes - self._register_wallet(wallet) - - def _register_wallet(self, wallet: BaseWallet | None) -> None: - """Register a wallet on pubsub.""" - if wallet is None: - return - - def handler(event: HathorEvents, args: EventArguments) -> None: - assert event == HathorEvents.NETWORK_NEW_TX_PROCESSING - wallet.on_new_tx(args.tx) - - self._pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_PROCESSING, handler) - @cpu.profiler('on_new_block') @inlineCallbacks def on_new_block(self, block: Block, *, deps: list[Transaction]) -> Generator[Any, Any, bool]: @@ -187,8 +173,8 @@ def _old_on_new_vertex( return False try: - self._unsafe_save_and_run_consensus(vertex) - self._post_consensus(vertex, params, quiet=quiet) + consensus_events = self._unsafe_save_and_run_consensus(vertex) + self._post_consensus(vertex, params, consensus_events, quiet=quiet) except BaseException: self._log.error('unexpected exception in on_new_vertex()', vertex=vertex) meta = vertex.get_metadata() @@ -230,7 +216,7 @@ def _validate_vertex(self, vertex: BaseTransaction, params: VerificationParams) return True - def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None: + def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> list[ConsensusEvent]: """ This method is considered unsafe because the caller is responsible for crashing the full node if this method throws any exception. @@ -243,13 +229,13 @@ def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None: self._tx_storage.save_transaction(vertex) with non_critical_code(self._log): self._tx_storage.indexes.add_to_non_critical_indexes(vertex) - self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_PROCESSING, tx=vertex) - self._consensus.unsafe_update(vertex) + return self._consensus.unsafe_update(vertex) def _post_consensus( self, vertex: BaseTransaction, params: VerificationParams, + consensus_events: list[ConsensusEvent], *, quiet: bool, ) -> None: @@ -270,7 +256,9 @@ def _post_consensus( with non_critical_code(self._log): self._tx_storage.indexes.update_non_critical_indexes(vertex) - # Publish to pubsub manager the new tx accepted, now that it's full validated + self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_PROCESSING, tx=vertex) + for event in consensus_events: + self._pubsub.publish(event.event, **event.kwargs) self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=vertex) self._log_new_object(vertex, 'new {}', quiet=quiet) diff --git a/hathor_cli/builder.py b/hathor_cli/builder.py index a30a73c53..e0bb6de07 100644 --- a/hathor_cli/builder.py +++ b/hathor_cli/builder.py @@ -249,7 +249,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: consensus_algorithm = ConsensusAlgorithm( self.nc_storage_factory, soft_voided_tx_ids, - pubsub=pubsub, settings=settings, runner_factory=runner_factory, nc_log_storage=nc_log_storage, @@ -317,7 +316,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: feature_service=self.feature_service, pubsub=pubsub, execution_manager=execution_manager, - wallet=self.wallet, log_vertex_bytes=self._args.log_vertex_bytes, ) diff --git a/hathor_cli/events_simulator/scenario.py b/hathor_cli/events_simulator/scenario.py index 340af4427..600ed217f 100644 --- a/hathor_cli/events_simulator/scenario.py +++ b/hathor_cli/events_simulator/scenario.py @@ -177,7 +177,7 @@ def simulate_invalid_mempool_transaction(simulator: 'Simulator', manager: 'Hatho assert manager.wallet is not None address = manager.wallet.get_unused_address(mark_as_used=False) - blocks = add_new_blocks(manager, settings.REWARD_SPEND_MIN_BLOCKS + 1, advance_clock=1) + blocks = add_new_blocks(manager, settings.REWARD_SPEND_MIN_BLOCKS + 1) simulator.run(60) balance_per_address = manager.wallet.get_balance_per_address(settings.HATHOR_TOKEN_UID) diff --git a/hathor_tests/p2p/test_double_spending.py b/hathor_tests/p2p/test_double_spending.py index 7e689873a..a61d01a2d 100644 --- a/hathor_tests/p2p/test_double_spending.py +++ b/hathor_tests/p2p/test_double_spending.py @@ -24,7 +24,7 @@ def _add_new_transactions(self, manager: HathorManager, num_txs: int) -> list[Tr for _ in range(num_txs): address = not_none(self.get_address(0)) value = self.rng.choice([5, 10, 15, 20]) - tx = add_new_tx(manager, address, value, advance_clock=1) + tx = add_new_tx(manager, address, value) txs.append(tx) return txs diff --git a/hathor_tests/resources/transaction/test_block_at_height.py b/hathor_tests/resources/transaction/test_block_at_height.py index 172a474c2..7a19967ba 100644 --- a/hathor_tests/resources/transaction/test_block_at_height.py +++ b/hathor_tests/resources/transaction/test_block_at_height.py @@ -20,14 +20,14 @@ def test_include_full(self): confirmed_tx_list = [] for _ in range(15): - confirmed_tx_list.append(add_new_tx(self.manager, address, 1, advance_clock=1)) + confirmed_tx_list.append(add_new_tx(self.manager, address, 1)) block = add_new_block(self.manager, advance_clock=1) height = block.get_height() # non-confirmed transactions for _ in range(15): - add_new_tx(self.manager, address, 1, advance_clock=1) + add_new_tx(self.manager, address, 1) response = yield self.web.get("block_at_height", { b'height': str(height).encode('ascii'), @@ -48,14 +48,14 @@ def test_include_txids(self): confirmed_tx_list = [] for _ in range(15): - confirmed_tx_list.append(add_new_tx(self.manager, address, 1, advance_clock=1)) + confirmed_tx_list.append(add_new_tx(self.manager, address, 1)) block = add_new_block(self.manager, advance_clock=1) height = block.get_height() # non-confirmed transactions for _ in range(15): - add_new_tx(self.manager, address, 1, advance_clock=1) + add_new_tx(self.manager, address, 1) response = yield self.web.get("block_at_height", { b'height': str(height).encode('ascii'), diff --git a/hathor_tests/resources/transaction/test_transaction_confirmation.py b/hathor_tests/resources/transaction/test_transaction_confirmation.py index 15686981f..178f9a939 100644 --- a/hathor_tests/resources/transaction/test_transaction_confirmation.py +++ b/hathor_tests/resources/transaction/test_transaction_confirmation.py @@ -27,7 +27,7 @@ def test_get_data(self): # Adding blocks to have funds add_new_blocks(self.manager, 2, advance_clock=1) add_blocks_unlock_reward(self.manager) - tx = add_new_transactions(self.manager, 5, advance_clock=1)[0] + tx = add_new_transactions(self.manager, 5)[0] add_new_blocks(self.manager, 2, advance_clock=1) add_blocks_unlock_reward(self.manager) response_success2 = yield self.web.get( diff --git a/hathor_tests/resources/transaction/test_tx.py b/hathor_tests/resources/transaction/test_tx.py index 1adffa449..3a461e1bd 100644 --- a/hathor_tests/resources/transaction/test_tx.py +++ b/hathor_tests/resources/transaction/test_tx.py @@ -292,9 +292,7 @@ def test_get_many(self): # Add some blocks and txs and get them in timestamp order blocks = add_new_blocks(self.manager, 4, advance_clock=1) _blocks = add_blocks_unlock_reward(self.manager) - txs = sorted( - add_new_transactions(self.manager, 25, advance_clock=1), key=lambda x: (x.timestamp, x.hash) - ) + txs = sorted(add_new_transactions(self.manager, 25), key=lambda x: (x.timestamp, x.hash)) blocks.extend(_blocks) blocks = sorted(blocks, key=lambda x: (x.timestamp, x.hash)) @@ -486,7 +484,7 @@ def test_negative_timestamp(self): # Add some blocks and txs and get them in timestamp order blocks = add_new_blocks(self.manager, 4, advance_clock=1) add_blocks_unlock_reward(self.manager) - add_new_transactions(self.manager, 25, advance_clock=1) + add_new_transactions(self.manager, 25) response = yield self.web.get( "transaction", { diff --git a/hathor_tests/tx/test_indexes.py b/hathor_tests/tx/test_indexes.py index abed13b06..b93cafd33 100644 --- a/hathor_tests/tx/test_indexes.py +++ b/hathor_tests/tx/test_indexes.py @@ -362,7 +362,7 @@ def test_utxo_index_limits(self): txs = [] values = list(range(1, 301)) for value in values: - txs.append(add_new_tx(self.manager, address, value, advance_clock=1)) + txs.append(add_new_tx(self.manager, address, value)) assert len(txs) == len(values) txs_and_values = list(zip(txs, values)) diff --git a/hathor_tests/tx/test_traversal.py b/hathor_tests/tx/test_traversal.py index eaa4776e2..d8a538e78 100644 --- a/hathor_tests/tx/test_traversal.py +++ b/hathor_tests/tx/test_traversal.py @@ -20,7 +20,7 @@ def setUp(self): self.blocks_before = add_new_blocks(self.manager, 3, advance_clock=1) self.blocks_before.extend(add_blocks_unlock_reward(self.manager)) - self.txs_before = add_new_transactions(self.manager, 5, advance_clock=1) + self.txs_before = add_new_transactions(self.manager, 5) for block in self.blocks_before: self.hashes_before.add(block.hash) for tx in self.txs_before: @@ -30,7 +30,7 @@ def setUp(self): self.root_tx = add_new_tx(self.manager, address=address, value=100) self.blocks_after = add_blocks_unlock_reward(self.manager) - self.txs_after = add_new_transactions(self.manager, 5, advance_clock=1) + self.txs_after = add_new_transactions(self.manager, 5) self.blocks_after.extend(add_new_blocks(self.manager, 3, advance_clock=1)) self.hashes_after = set() diff --git a/hathor_tests/utils.py b/hathor_tests/utils.py index 5ddc167c9..4866987ae 100644 --- a/hathor_tests/utils.py +++ b/hathor_tests/utils.py @@ -181,7 +181,7 @@ def add_new_tx( manager: HathorManager, address: str, value: int, - advance_clock: int | None = None, + advance_clock: int = 1, propagate: bool = True, name: str | None = None, ) -> Transaction: @@ -211,7 +211,7 @@ def add_new_tx( def add_new_transactions( manager: HathorManager, num_txs: int, - advance_clock: int | None = None, + advance_clock: int = 1, propagate: bool = True, name: str | None = None, ) -> list[Transaction]: diff --git a/hathor_tests/wallet/test_wallet_hd.py b/hathor_tests/wallet/test_wallet_hd.py index c54a80409..60dc0d104 100644 --- a/hathor_tests/wallet/test_wallet_hd.py +++ b/hathor_tests/wallet/test_wallet_hd.py @@ -29,7 +29,7 @@ def test_transaction_and_balance(self): # generate a new block and check if we increase balance new_address = self.wallet.get_unused_address() out = WalletOutputInfo(decode_address(new_address), self.TOKENS, timelock=None) - block = add_new_block(self.manager, advance_clock=1) + block = add_new_block(self.manager) self.manager.verification_service.verify(block, self.get_verification_params(self.manager)) utxo = self.wallet.unspent_txs[self._settings.HATHOR_TOKEN_UID].get((block.hash, 0)) self.assertIsNotNone(utxo) From f5c6f535f820351ea4b22f418c7bf913ce237d30 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 26 Jan 2026 13:01:29 -0300 Subject: [PATCH 3/4] review changes --- hathor/simulator/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hathor/simulator/utils.py b/hathor/simulator/utils.py index 11ce939a5..309c467c6 100644 --- a/hathor/simulator/utils.py +++ b/hathor/simulator/utils.py @@ -85,7 +85,7 @@ def add_new_blocks( def add_new_block( manager: HathorManager, - advance_clock: Optional[int] = None, + advance_clock: int = 1, *, parent_block_hash: Optional[VertexId] = None, data: bytes = b'', From 0e8fa622bf582fd524394b7938d84db56932716f Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 26 Jan 2026 13:30:38 -0300 Subject: [PATCH 4/4] fix tests --- hathor_tests/tx/test_indexes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hathor_tests/tx/test_indexes.py b/hathor_tests/tx/test_indexes.py index b93cafd33..0173bd447 100644 --- a/hathor_tests/tx/test_indexes.py +++ b/hathor_tests/tx/test_indexes.py @@ -250,7 +250,7 @@ def check_utxos(*args): block2 = self.manager.generate_mining_block(parent_block_hash=block1.parents[0], address=decode_address(address)) block2.parents[1:] = [txA2.hash, txB2.hash] - block2.timestamp = block1.timestamp + block2.timestamp = block1.timestamp + 1 block2.weight = 4 self.manager.cpu_mining_service.resolve(block2) self.manager.propagate_tx(block2)