Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,11 @@ def _get_or_create_nc_log_storage(self) -> NCLogStorage:
def _get_or_create_consensus(self) -> ConsensusAlgorithm:
if self._consensus is None:
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
pubsub = self._get_or_create_pubsub()
nc_storage_factory = self._get_or_create_nc_storage_factory()
nc_calls_sorter = self._get_nc_calls_sorter()
self._consensus = ConsensusAlgorithm(
nc_storage_factory=nc_storage_factory,
soft_voided_tx_ids=soft_voided_tx_ids,
pubsub=pubsub,
settings=self._get_or_create_settings(),
runner_factory=self._get_or_create_runner_factory(),
nc_log_storage=self._get_or_create_nc_log_storage(),
Expand Down Expand Up @@ -657,7 +655,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler:
feature_service=self._get_or_create_feature_service(),
execution_manager=self._get_or_create_execution_manager(),
pubsub=self._get_or_create_pubsub(),
wallet=self._get_or_create_wallet(),
)

return self._vertex_handler
Expand Down
53 changes: 33 additions & 20 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from __future__ import annotations

from collections import defaultdict
from typing import TYPE_CHECKING, Callable, assert_never
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, assert_never

from structlog import get_logger

Expand All @@ -27,7 +28,7 @@
from hathor.nanocontracts.exception import NCInvalidSignature
from hathor.nanocontracts.execution import NCBlockExecutor
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.pubsub import HathorEvents
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.exceptions import InvalidInputData, RewardLocked, TooManySigOps
from hathor.util import not_none
Expand All @@ -48,6 +49,12 @@
_base_transaction_log = logger.new()


@dataclass(slots=True, frozen=True, kw_only=True)
class ConsensusEvent:
event: HathorEvents
kwargs: dict[str, Any]


class ConsensusAlgorithm:
"""Execute the consensus algorithm marking blocks and transactions as either executed or voided.

Expand Down Expand Up @@ -78,7 +85,6 @@ def __init__(
self,
nc_storage_factory: 'NCStorageFactory',
soft_voided_tx_ids: set[bytes],
pubsub: PubSubManager,
*,
settings: HathorSettings,
tx_storage: TransactionStorage,
Expand All @@ -90,7 +96,6 @@ def __init__(
) -> None:
self._settings = settings
self.log = logger.new()
self._pubsub = pubsub
self.tx_storage = tx_storage
self.nc_storage_factory = nc_storage_factory
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
Expand All @@ -114,10 +119,10 @@ def __init__(

def create_context(self) -> ConsensusAlgorithmContext:
"""Handy method to create a context that can be used to access block and transaction algorithms."""
return ConsensusAlgorithmContext(self, self._pubsub)
return ConsensusAlgorithmContext(self)

@cpu.profiler(key=lambda self, base: 'consensus!{}'.format(base.hash.hex()))
def unsafe_update(self, base: BaseTransaction) -> None:
def unsafe_update(self, base: BaseTransaction) -> list[ConsensusEvent]:
"""
Run a consensus update with its own context, indexes will be updated accordingly.

Expand Down Expand Up @@ -177,6 +182,8 @@ def unsafe_update(self, base: BaseTransaction) -> None:
with self.tx_storage.allow_invalid_context():
self._remove_transactions(txs_to_remove, context)

pubsub_events = []

# emit the reorg started event if needed
if context.reorg_info is not None:
assert isinstance(old_best_block, Block)
Expand All @@ -193,44 +200,50 @@ def unsafe_update(self, base: BaseTransaction) -> None:
new_best_block=new_best_block.hash_hex,
common_block=context.reorg_info.common_block.hash_hex,
)
context.pubsub.publish(
HathorEvents.REORG_STARTED,
old_best_height=best_height,
old_best_block=old_best_block,
new_best_height=new_best_height,
new_best_block=new_best_block,
common_block=context.reorg_info.common_block,
reorg_size=reorg_size,
)
pubsub_events.append(ConsensusEvent(
event=HathorEvents.REORG_STARTED,
kwargs=dict(
old_best_height=best_height,
old_best_block=old_best_block,
new_best_height=new_best_height,
new_best_block=new_best_block,
common_block=context.reorg_info.common_block,
reorg_size=reorg_size,
)
))

# finally signal an index update for all affected transactions
for tx_affected in _sorted_affected_txs(context.txs_affected):
self.tx_storage.indexes.update_critical_indexes(tx_affected)
with non_critical_code(self.log):
self.tx_storage.indexes.update_non_critical_indexes(tx_affected)
context.pubsub.publish(HathorEvents.CONSENSUS_TX_UPDATE, tx=tx_affected)
pubsub_events.append(ConsensusEvent(event=HathorEvents.CONSENSUS_TX_UPDATE, kwargs=dict(tx=tx_affected)))

# signal all transactions of which the execution succeeded
for tx_nc_success in context.nc_exec_success:
context.pubsub.publish(HathorEvents.NC_EXEC_SUCCESS, tx=tx_nc_success)
pubsub_events.append(ConsensusEvent(event=HathorEvents.NC_EXEC_SUCCESS, kwargs=dict(tx=tx_nc_success)))

# handle custom NC events
if isinstance(base, Block):
assert context.nc_events is not None
for tx, events in context.nc_events:
assert tx.is_nano_contract()
for event in events:
context.pubsub.publish(HathorEvents.NC_EVENT, tx=tx, nc_event=event)
pubsub_events.append(
ConsensusEvent(event=HathorEvents.NC_EVENT, kwargs=dict(tx=tx, nc_event=event))
)
else:
assert context.nc_events is None

# And emit events for txs that were removed
for tx_removed in txs_to_remove:
context.pubsub.publish(HathorEvents.CONSENSUS_TX_REMOVED, tx=tx_removed)
pubsub_events.append(ConsensusEvent(event=HathorEvents.CONSENSUS_TX_REMOVED, kwargs=dict(tx=tx_removed)))

# and also emit the reorg finished event if needed
if context.reorg_info is not None:
context.pubsub.publish(HathorEvents.REORG_FINISHED)
pubsub_events.append(ConsensusEvent(event=HathorEvents.REORG_FINISHED, kwargs={}))

return pubsub_events

def filter_out_voided_by_entries_from_parents(self, tx: BaseTransaction, voided_by: set[bytes]) -> set[bytes]:
"""Filter out voided_by entries that should be inherited from parents."""
Expand Down
6 changes: 1 addition & 5 deletions hathor/consensus/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from structlog import get_logger

from hathor.pubsub import PubSubManager
from hathor.transaction import BaseTransaction, Block, Transaction

if TYPE_CHECKING:
Expand All @@ -45,7 +44,6 @@ class ConsensusAlgorithmContext:
"""
__slots__ = (
'consensus',
'pubsub',
'block_algorithm',
'transaction_algorithm',
'txs_affected',
Expand All @@ -55,17 +53,15 @@ class ConsensusAlgorithmContext:
)

consensus: 'ConsensusAlgorithm'
pubsub: PubSubManager
block_algorithm: 'BlockConsensusAlgorithm'
transaction_algorithm: 'TransactionConsensusAlgorithm'
txs_affected: set[BaseTransaction]
reorg_info: ReorgInfo | None
nc_events: list[tuple[Transaction, list[NCEvent]]] | None
nc_exec_success: list[Transaction]

def __init__(self, consensus: 'ConsensusAlgorithm', pubsub: PubSubManager) -> None:
def __init__(self, consensus: 'ConsensusAlgorithm') -> None:
self.consensus = consensus
self.pubsub = pubsub
self.block_algorithm = self.consensus.block_algorithm_factory(self)
self.transaction_algorithm = self.consensus.transaction_algorithm_factory(self)
self.txs_affected = set()
Expand Down
11 changes: 10 additions & 1 deletion hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_id import PeerId
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.reward_lock import is_spent_reward_locked
from hathor.stratum import StratumFactory
Expand Down Expand Up @@ -222,6 +222,7 @@ def __init__(
if self.wallet:
self.wallet.pubsub = self.pubsub
self.wallet.reactor = self.reactor
self._subscribe_wallet(self.wallet)

# It will be inject later by the builder.
# XXX Remove this attribute after all dependencies are cleared.
Expand Down Expand Up @@ -251,6 +252,14 @@ def __init__(
self.lc_check_sync_state.clock = self.reactor
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL

def _subscribe_wallet(self, wallet: BaseWallet) -> None:
"""Register a wallet on pubsub."""
def handler(event: HathorEvents, args: EventArguments) -> None:
assert event == HathorEvents.NETWORK_NEW_TX_PROCESSING
wallet.on_new_tx(args.tx)

self.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_PROCESSING, handler)

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities for this manager."""
default_capabilities = [
Expand Down
6 changes: 6 additions & 0 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@

class HathorEvents(Enum):
"""
NETWORK_NEW_TX_PROCESSING:
Triggered when a new tx/block is received and will begin processing, just before consensus
Publishes a tx/block object

NETWORK_NEW_TX_ACCEPTED:
Triggered when a new tx/block is accepted in the network
Publishes a tx/block object
Expand Down Expand Up @@ -118,6 +122,8 @@ class HathorEvents(Enum):

NETWORK_PEER_DISCONNECTED = 'network:peer_disconnected'

NETWORK_NEW_TX_PROCESSING = 'network:new_tx_processing'

NETWORK_NEW_TX_ACCEPTED = 'network:new_tx_accepted'

CONSENSUS_TX_UPDATE = 'consensus:tx_update'
Expand Down
4 changes: 2 additions & 2 deletions hathor/simulator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def gen_new_tx(manager: HathorManager, address: str, value: int) -> Transaction:
def add_new_blocks(
manager: HathorManager,
num_blocks: int,
advance_clock: Optional[int] = None,
advance_clock: int = 1,
*,
parent_block_hash: Optional[VertexId] = None,
block_data: bytes = b'',
Expand Down Expand Up @@ -85,7 +85,7 @@ def add_new_blocks(

def add_new_block(
manager: HathorManager,
advance_clock: Optional[int] = None,
advance_clock: int = 1,
*,
parent_block_hash: Optional[VertexId] = None,
data: bytes = b'',
Expand Down
22 changes: 9 additions & 13 deletions hathor/vertex_handler/vertex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from hathor.conf.settings import HathorSettings
from hathor.consensus import ConsensusAlgorithm
from hathor.consensus.consensus import ConsensusEvent
from hathor.exception import HathorError, InvalidNewTransaction
from hathor.execution_manager import ExecutionManager, non_critical_code
from hathor.feature_activation.feature_service import FeatureService
Expand All @@ -36,7 +37,6 @@
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.verification.verification_params import VerificationParams
from hathor.verification.verification_service import VerificationService
from hathor.wallet import BaseWallet

logger = get_logger()
cpu = get_cpu_profiler()
Expand All @@ -53,7 +53,6 @@ class VertexHandler:
'_feature_service',
'_pubsub',
'_execution_manager',
'_wallet',
'_log_vertex_bytes',
)

Expand All @@ -68,7 +67,6 @@ def __init__(
feature_service: FeatureService,
pubsub: PubSubManager,
execution_manager: ExecutionManager,
wallet: BaseWallet | None,
log_vertex_bytes: bool = False,
) -> None:
self._log = logger.new()
Expand All @@ -80,7 +78,6 @@ def __init__(
self._feature_service = feature_service
self._pubsub = pubsub
self._execution_manager = execution_manager
self._wallet = wallet
self._log_vertex_bytes = log_vertex_bytes

@cpu.profiler('on_new_block')
Expand Down Expand Up @@ -176,8 +173,8 @@ def _old_on_new_vertex(
return False

try:
self._unsafe_save_and_run_consensus(vertex)
self._post_consensus(vertex, params, quiet=quiet)
consensus_events = self._unsafe_save_and_run_consensus(vertex)
self._post_consensus(vertex, params, consensus_events, quiet=quiet)
except BaseException:
self._log.error('unexpected exception in on_new_vertex()', vertex=vertex)
meta = vertex.get_metadata()
Expand Down Expand Up @@ -219,7 +216,7 @@ def _validate_vertex(self, vertex: BaseTransaction, params: VerificationParams)

return True

def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None:
def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> list[ConsensusEvent]:
"""
This method is considered unsafe because the caller is responsible for crashing the full node
if this method throws any exception.
Expand All @@ -232,12 +229,13 @@ def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None:
self._tx_storage.save_transaction(vertex)
with non_critical_code(self._log):
self._tx_storage.indexes.add_to_non_critical_indexes(vertex)
self._consensus.unsafe_update(vertex)
return self._consensus.unsafe_update(vertex)

def _post_consensus(
self,
vertex: BaseTransaction,
params: VerificationParams,
consensus_events: list[ConsensusEvent],
*,
quiet: bool,
) -> None:
Expand All @@ -258,13 +256,11 @@ def _post_consensus(
with non_critical_code(self._log):
self._tx_storage.indexes.update_non_critical_indexes(vertex)

# Publish to pubsub manager the new tx accepted, now that it's full validated
self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_PROCESSING, tx=vertex)
for event in consensus_events:
self._pubsub.publish(event.event, **event.kwargs)
self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=vertex)

if self._wallet:
# TODO Remove it and use pubsub instead.
self._wallet.on_new_tx(vertex)

self._log_new_object(vertex, 'new {}', quiet=quiet)

def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
Expand Down
2 changes: 0 additions & 2 deletions hathor_cli/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
consensus_algorithm = ConsensusAlgorithm(
self.nc_storage_factory,
soft_voided_tx_ids,
pubsub=pubsub,
settings=settings,
runner_factory=runner_factory,
nc_log_storage=nc_log_storage,
Expand Down Expand Up @@ -317,7 +316,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
feature_service=self.feature_service,
pubsub=pubsub,
execution_manager=execution_manager,
wallet=self.wallet,
log_vertex_bytes=self._args.log_vertex_bytes,
)

Expand Down
1 change: 1 addition & 0 deletions hathor_tests/consensus/test_soft_voided.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def _run_test(
assert manager2.wallet is not None
address = manager2.wallet.get_unused_address(mark_as_used=False)
value = 1
simulator.run_to_completion()
txC = gen_new_tx(manager2, address, value)
txC.parents[0] = txA.hash
txC.timestamp = max(txC.timestamp, txA.timestamp + 1)
Expand Down
1 change: 1 addition & 0 deletions hathor_tests/resources/wallet/test_balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def test_get(self):
cpu_mining_service=CpuMiningService()
)
yield self.web_mining.post("mining", {'block_bytes': base64.b64encode(block_bytes).decode('utf-8')})
self.clock.advance(1)

# Get new balance after block
response2 = yield self.web.get("wallet/balance")
Expand Down
1 change: 1 addition & 0 deletions hathor_tests/resources/wallet/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_get(self):
cpu_mining_service=CpuMiningService()
)
yield self.web_mining.post("mining", {'block_bytes': base64.b64encode(block_bytes).decode('utf-8')})
self.clock.advance(1)

# Getting wallet history
response = yield self.web.get("wallet/history", {b'page': 1, b'count': 10})
Expand Down
Loading