diff --git a/hathor/consensus/poa/__init__.py b/hathor/consensus/poa/__init__.py index df6973854..084ceec8a 100644 --- a/hathor/consensus/poa/__init__.py +++ b/hathor/consensus/poa/__init__.py @@ -7,7 +7,7 @@ calculate_weight, get_active_signers, get_hashed_poa_data, - in_turn_signer_index, + get_signer_index_distance, verify_poa_signature, ) from .poa_block_producer import PoaBlockProducer @@ -18,7 +18,6 @@ 'BLOCK_WEIGHT_OUT_OF_TURN', 'SIGNER_ID_LEN', 'get_hashed_poa_data', - 'in_turn_signer_index', 'calculate_weight', 'PoaBlockProducer', 'PoaSigner', @@ -27,4 +26,5 @@ 'InvalidSignature', 'ValidSignature', 'get_active_signers', + 'get_signer_index_distance', ] diff --git a/hathor/consensus/poa/poa.py b/hathor/consensus/poa/poa.py index 57ef4fd2c..03ecbf3a5 100644 --- a/hathor/consensus/poa/poa.py +++ b/hathor/consensus/poa/poa.py @@ -55,16 +55,21 @@ def get_active_signers(settings: PoaSettings, height: int) -> list[bytes]: return active_signers -def in_turn_signer_index(settings: PoaSettings, height: int) -> int: - """Return the signer index that is in turn for the given height.""" +def get_signer_index_distance(*, settings: PoaSettings, signer_index: int, height: int) -> int: + """Considering a block height, return the signer index distance to that block. When the distance is 0, it means it + is the signer's turn.""" active_signers = get_active_signers(settings, height) - return height % len(active_signers) + expected_index = height % len(active_signers) + signers = get_active_signers(settings, height) + index_distance = (signer_index - expected_index) % len(signers) + assert 0 <= index_distance < len(signers) + return index_distance def calculate_weight(settings: PoaSettings, block: PoaBlock, signer_index: int) -> float: """Return the weight for the given block and signer.""" - expected_index = in_turn_signer_index(settings, block.get_height()) - return BLOCK_WEIGHT_IN_TURN if expected_index == signer_index else BLOCK_WEIGHT_OUT_OF_TURN + index_distance = get_signer_index_distance(settings=settings, signer_index=signer_index, height=block.get_height()) + return BLOCK_WEIGHT_IN_TURN if index_distance == 0 else BLOCK_WEIGHT_OUT_OF_TURN / index_distance @dataclass(frozen=True, slots=True) diff --git a/hathor/consensus/poa/poa_block_producer.py b/hathor/consensus/poa/poa_block_producer.py index ca7a1a151..267e7792c 100644 --- a/hathor/consensus/poa/poa_block_producer.py +++ b/hathor/consensus/poa/poa_block_producer.py @@ -24,6 +24,7 @@ from hathor.consensus import poa from hathor.consensus.consensus_settings import PoaSettings from hathor.crypto.util import get_public_key_bytes_compressed +from hathor.pubsub import EventArguments, HathorEvents from hathor.reactor import ReactorProtocol from hathor.util import not_none @@ -35,11 +36,8 @@ logger = get_logger() -# Number of seconds to wait for a sync to finish before trying to produce blocks -_WAIT_SYNC_DELAY: int = 30 - # Number of seconds used between each signer depending on its distance to the expected signer -_SIGNER_TURN_INTERVAL: int = 1 +_SIGNER_TURN_INTERVAL: int = 10 class PoaBlockProducer: @@ -54,11 +52,9 @@ class PoaBlockProducer: '_reactor', '_manager', '_poa_signer', - '_started_producing', - '_start_producing_lc', - '_schedule_block_lc', '_last_seen_best_block', '_delayed_call', + '_start_producing_lc', ) def __init__(self, *, settings: HathorSettings, reactor: ReactorProtocol, poa_signer: PoaSigner) -> None: @@ -70,14 +66,9 @@ def __init__(self, *, settings: HathorSettings, reactor: ReactorProtocol, poa_si self._manager: HathorManager | None = None self._poa_signer = poa_signer self._last_seen_best_block: Block | None = None - - self._started_producing = False - self._start_producing_lc = LoopingCall(self._start_producing) - self._start_producing_lc.clock = self._reactor - - self._schedule_block_lc = LoopingCall(self._schedule_block) - self._schedule_block_lc.clock = self._reactor self._delayed_call: IDelayedCall | None = None + self._start_producing_lc: LoopingCall = LoopingCall(self._safe_start_producing) + self._start_producing_lc.clock = self._reactor @property def manager(self) -> HathorManager: @@ -89,19 +80,18 @@ def manager(self, manager: HathorManager) -> None: self._manager = manager def start(self) -> None: - self._start_producing_lc.start(_WAIT_SYNC_DELAY) - self._schedule_block_lc.start(self._settings.AVG_TIME_BETWEEN_BLOCKS) + self.manager.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, self._on_new_vertex) + self._start_producing_lc.start(self._settings.AVG_TIME_BETWEEN_BLOCKS) def stop(self) -> None: - if self._start_producing_lc.running: - self._start_producing_lc.stop() - - if self._schedule_block_lc.running: - self._schedule_block_lc.stop() + self.manager.pubsub.unsubscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, self._on_new_vertex) if self._delayed_call and self._delayed_call.active(): self._delayed_call.cancel() + if self._start_producing_lc.running: + self._start_producing_lc.stop() + def _get_signer_index(self, previous_block: Block) -> int | None: """Return our signer index considering the active signers.""" height = previous_block.get_height() + 1 @@ -113,7 +103,13 @@ def _get_signer_index(self, previous_block: Block) -> int | None: except ValueError: return None - def _start_producing(self) -> None: + def _safe_start_producing(self) -> None: + try: + return self._unsafe_start_producing() + except Exception: + self._log.exception('error while trying to start block production') + + def _unsafe_start_producing(self) -> None: """Start producing new blocks.""" if not self.manager.can_start_mining(): # We're syncing, so we'll try again later @@ -121,13 +117,35 @@ def _start_producing(self) -> None: return self._log.info('started producing new blocks') - self._started_producing = True - self._start_producing_lc.stop() + self._schedule_block() + + def _on_new_vertex(self, event: HathorEvents, args: EventArguments) -> None: + """Handle propagation of new blocks after a vertex is received.""" + assert event is HathorEvents.NETWORK_NEW_TX_ACCEPTED + block = args.tx + + from hathor.transaction import Block + if not isinstance(block, Block): + return + + from hathor.transaction.poa import PoaBlock + if isinstance(block, PoaBlock) and not block.weight == poa.BLOCK_WEIGHT_IN_TURN: + self._log.info('received out of turn block', block=block.hash_hex, signer_id=block.signer_id) + + self._schedule_block() def _schedule_block(self) -> None: """Schedule propagation of a new block.""" + if not self.manager.can_start_mining(): + # We're syncing, so we'll try again later + self._log.info('cannot produce new block, node not synced') + return + + if self._start_producing_lc.running: + self._start_producing_lc.stop() + previous_block = self.manager.tx_storage.get_best_block() - if not self._started_producing or previous_block == self._last_seen_best_block: + if previous_block == self._last_seen_best_block: return self._last_seen_best_block = previous_block @@ -139,7 +157,11 @@ def _schedule_block(self) -> None: expected_timestamp = self._expected_block_timestamp(previous_block, signer_index) propagation_delay = 0 if expected_timestamp < now else expected_timestamp - now + if self._delayed_call and self._delayed_call.active(): + self._delayed_call.cancel() + self._delayed_call = self._reactor.callLater(propagation_delay, self._produce_block, previous_block) + self._log.debug( 'scheduling block production', previous_block=previous_block.hash_hex, @@ -153,16 +175,16 @@ def _produce_block(self, previous_block: PoaBlock) -> None: block_templates = self.manager.get_block_templates(parent_block_hash=previous_block.hash) block = block_templates.generate_mining_block(self.manager.rng, cls=PoaBlock) assert isinstance(block, PoaBlock) + + if block.get_height() <= self.manager.tx_storage.get_height_best_block(): + return + signer_index = self._get_signer_index(previous_block) block.weight = poa.calculate_weight(self._poa_settings, block, not_none(signer_index)) self._poa_signer.sign_block(block) block.update_hash() - self.manager.on_new_tx(block, propagate_to_peers=False, fails_silently=False) - if not block.get_metadata().voided_by: - self.manager.connections.send_tx_to_peers(block) - - self._log.debug( + self._log.info( 'produced new block', block=block.hash_hex, height=block.get_height(), @@ -170,13 +192,18 @@ def _produce_block(self, previous_block: PoaBlock) -> None: parent=block.get_block_parent_hash().hex(), voided=bool(block.get_metadata().voided_by), ) + self.manager.on_new_tx(block, propagate_to_peers=True, fails_silently=False) def _expected_block_timestamp(self, previous_block: Block, signer_index: int) -> int: """Calculate the expected timestamp for a new block.""" height = previous_block.get_height() + 1 - expected_index = poa.in_turn_signer_index(settings=self._poa_settings, height=height) - signers = poa.get_active_signers(self._poa_settings, height) - index_distance = (signer_index - expected_index) % len(signers) - assert 0 <= index_distance < len(signers) + index_distance = poa.get_signer_index_distance( + settings=self._poa_settings, + signer_index=signer_index, + height=height, + ) delay = _SIGNER_TURN_INTERVAL * index_distance + if index_distance > 0: + # if it's not our turn, we add a constant offset to the delay + delay += self._settings.AVG_TIME_BETWEEN_BLOCKS return previous_block.timestamp + self._settings.AVG_TIME_BETWEEN_BLOCKS + delay diff --git a/tests/poa/test_poa.py b/tests/poa/test_poa.py index 3026701ff..3ab188975 100644 --- a/tests/poa/test_poa.py +++ b/tests/poa/test_poa.py @@ -218,40 +218,55 @@ def get_signer() -> tuple[PoaSigner, bytes]: @pytest.mark.parametrize( ['n_signers', 'height', 'signer_index', 'expected'], [ - (1, 1, 0, True), - (1, 2, 0, True), - (1, 3, 0, True), - - (2, 1, 0, False), - (2, 2, 0, True), - (2, 3, 0, False), - - (2, 1, 1, True), - (2, 2, 1, False), - (2, 3, 1, True), + (1, 1, 0, 0), + (1, 2, 0, 0), + (1, 3, 0, 0), + + (2, 1, 0, 1), + (2, 2, 0, 0), + (2, 3, 0, 1), + + (2, 1, 1, 0), + (2, 2, 1, 1), + (2, 3, 1, 0), + + (5, 1, 0, 4), + (5, 2, 0, 3), + (5, 3, 0, 2), + (5, 4, 0, 1), + (5, 5, 0, 0), ] ) -def test_in_turn_signer_index(n_signers: int, height: int, signer_index: int, expected: bool) -> None: +def test_get_signer_index_distance(n_signers: int, height: int, signer_index: int, expected: int) -> None: settings = PoaSettings.construct(signers=tuple(PoaSignerSettings(public_key=b'') for _ in range(n_signers))) - result = poa.in_turn_signer_index(settings=settings, height=height) == signer_index + result = poa.get_signer_index_distance(settings=settings, signer_index=signer_index, height=height) assert result == expected @pytest.mark.parametrize( ['n_signers', 'height', 'signer_index', 'expected'], [ + (1, 0, 0, poa.BLOCK_WEIGHT_IN_TURN), (1, 1, 0, poa.BLOCK_WEIGHT_IN_TURN), (1, 2, 0, poa.BLOCK_WEIGHT_IN_TURN), (1, 3, 0, poa.BLOCK_WEIGHT_IN_TURN), + (2, 0, 0, poa.BLOCK_WEIGHT_IN_TURN), (2, 1, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN), (2, 2, 0, poa.BLOCK_WEIGHT_IN_TURN), (2, 3, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN), + (2, 0, 1, poa.BLOCK_WEIGHT_OUT_OF_TURN), (2, 1, 1, poa.BLOCK_WEIGHT_IN_TURN), (2, 2, 1, poa.BLOCK_WEIGHT_OUT_OF_TURN), (2, 3, 1, poa.BLOCK_WEIGHT_IN_TURN), + + (5, 0, 0, poa.BLOCK_WEIGHT_IN_TURN), + (5, 1, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 4), + (5, 2, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 3), + (5, 3, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 2), + (5, 4, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 1), ] ) def test_calculate_weight(n_signers: int, height: int, signer_index: int, expected: float) -> None: diff --git a/tests/poa/test_poa_block_producer.py b/tests/poa/test_poa_block_producer.py index ac71b1ba9..3be849470 100644 --- a/tests/poa/test_poa_block_producer.py +++ b/tests/poa/test_poa_block_producer.py @@ -57,7 +57,6 @@ def test_poa_block_producer_one_signer() -> None: # when we can start mining, we start producing blocks manager.can_start_mining = Mock(return_value=True) - reactor.advance(20) # we produce our first block reactor.advance(10) @@ -116,7 +115,6 @@ def test_poa_block_producer_two_signers() -> None: # when we can start mining, we start producing blocks manager.can_start_mining = Mock(return_value=True) - reactor.advance(20) # we produce our first block reactor.advance(10) @@ -144,14 +142,14 @@ def test_poa_block_producer_two_signers() -> None: manager.on_new_tx.reset_mock() # haven't produced the third block yet - reactor.advance(9) + reactor.advance(29) # we produce our third block - reactor.advance(2) + reactor.advance(1) manager.on_new_tx.assert_called_once() block3 = manager.on_new_tx.call_args.args[0] assert isinstance(block3, PoaBlock) - assert block3.timestamp == block2.timestamp + 11 + assert block3.timestamp == block2.timestamp + 30 assert block3.weight == poa.BLOCK_WEIGHT_OUT_OF_TURN assert block3.outputs == [] assert block3.get_block_parent_hash() == block2.hash @@ -161,15 +159,15 @@ def test_poa_block_producer_two_signers() -> None: @pytest.mark.parametrize( ['previous_height', 'signer_index', 'expected_delay'], [ - (0, 0, 33), + (0, 0, 90), (0, 1, 30), - (0, 2, 31), - (0, 3, 32), + (0, 2, 70), + (0, 3, 80), - (1, 0, 32), - (1, 1, 33), + (1, 0, 80), + (1, 1, 90), (1, 2, 30), - (1, 3, 31), + (1, 3, 70), ] ) def test_expected_block_timestamp(previous_height: int, signer_index: int, expected_delay: int) -> None: diff --git a/tests/poa/test_poa_simulation.py b/tests/poa/test_poa_simulation.py index eb5907162..33c455426 100644 --- a/tests/poa/test_poa_simulation.py +++ b/tests/poa/test_poa_simulation.py @@ -20,6 +20,7 @@ import pytest from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec +from twisted.python.failure import Failure from hathor.conf.settings import HathorSettings from hathor.consensus import poa @@ -117,7 +118,7 @@ def test_one_producer_allowed(self) -> None: # manager is allowed to produce blocks, so it does manager.allow_mining_without_peers() - self.simulator.run(120) + self.simulator.run(90) assert manager.tx_storage.get_block_count() == 10 _assert_height_weight_signer_id( @@ -155,9 +156,9 @@ def test_two_producers(self) -> None: self.simulator.add_connection(connection) # both managers are producing blocks - self.simulator.run(125) - assert manager1.tx_storage.get_block_count() == 16 - assert manager2.tx_storage.get_block_count() == 17 + self.simulator.run(100) + assert manager1.tx_storage.get_block_count() == 12 + assert manager2.tx_storage.get_block_count() == 12 assert manager1.tx_storage.get_best_block_tips() == manager2.tx_storage.get_best_block_tips() _assert_height_weight_signer_id( @@ -195,16 +196,53 @@ def test_two_producers(self) -> None: if height % 2 == 0: # if the height is even, it's manager1's turn. - # manager2 will produce its block too, but it'll be voided and not propagated. assert len(blocks_manager1) == 1 - assert len(blocks_manager2) == 2 _assert_block_in_turn(blocks_manager1[0], signer1) else: # if the height is odd, the opposite happens - assert len(blocks_manager1) == 2 assert len(blocks_manager2) == 1 _assert_block_in_turn(blocks_manager2[0], signer2) + def test_four_signers(self) -> None: + signer1, signer2, signer3, signer4 = get_signer(), get_signer(), get_signer(), get_signer() + signer_id1, signer_id2, signer_id3 = signer1._signer_id, signer2._signer_id, signer3._signer_id + self.simulator.settings = get_settings(signer1, signer2, signer3, signer4, time_between_blocks=10) + manager1 = self._get_manager(signer1) + manager2 = self._get_manager(signer2) + manager3 = self._get_manager(signer3) + + connection12 = FakeConnection(manager1, manager2) + connection13 = FakeConnection(manager1, manager3) + self.simulator.add_connection(connection12) + self.simulator.add_connection(connection13) + + # all managers are producing blocks + self.simulator.run(110) + + # manager2 and manager3 leave + manager2.stop() + manager3.stop() + self.simulator.run(160) + + # manager1 produces out of turn blocks with decreasing weights + _assert_height_weight_signer_id( + manager1.tx_storage.get_all_transactions(), + [ + (1, poa.BLOCK_WEIGHT_IN_TURN, signer_id2), + (2, poa.BLOCK_WEIGHT_IN_TURN, signer_id3), + (3, poa.BLOCK_WEIGHT_OUT_OF_TURN / 1, signer_id1), + (4, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), + (5, poa.BLOCK_WEIGHT_IN_TURN, signer_id2), + (6, poa.BLOCK_WEIGHT_IN_TURN, signer_id3), + (7, poa.BLOCK_WEIGHT_OUT_OF_TURN / 1, signer_id1), + (8, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), + (9, poa.BLOCK_WEIGHT_OUT_OF_TURN / 3, signer_id1), + (10, poa.BLOCK_WEIGHT_OUT_OF_TURN / 2, signer_id1), + (11, poa.BLOCK_WEIGHT_OUT_OF_TURN / 1, signer_id1), + (12, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), + ] + ) + def test_producer_leave_and_comeback(self) -> None: signer1, signer2 = get_signer(), get_signer() signer_id1, signer_id2 = signer1._signer_id, signer2._signer_id @@ -214,7 +252,7 @@ def test_producer_leave_and_comeback(self) -> None: # out of turn manager1 = self._get_manager(signer1) manager1.allow_mining_without_peers() - self.simulator.run(60) + self.simulator.run(50) manager2 = self._get_manager(signer2) connection = FakeConnection(manager1, manager2) @@ -222,33 +260,42 @@ def test_producer_leave_and_comeback(self) -> None: self.simulator.run(80) manager2.stop() - self.simulator.run(40) + connection.disconnect(Failure(Exception('testing'))) + self.simulator.remove_connection(connection) + self.simulator.run(70) + assert not manager2.can_start_mining() + self.simulator.add_connection(connection) + connection.reconnect() manager2.start() self.simulator.run(30) - assert manager1.tx_storage.get_block_count() == 23 - assert manager2.tx_storage.get_block_count() == 23 + assert manager1.tx_storage.get_block_count() == 19 + assert manager2.tx_storage.get_block_count() == 19 assert manager1.tx_storage.get_best_block_tips() == manager2.tx_storage.get_best_block_tips() _assert_height_weight_signer_id( manager1.tx_storage.get_all_transactions(), [ + # Before manager2 joins, only manager1 produces blocks (1, poa.BLOCK_WEIGHT_OUT_OF_TURN, signer_id1), (2, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), (3, poa.BLOCK_WEIGHT_OUT_OF_TURN, signer_id1), (4, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), (5, poa.BLOCK_WEIGHT_OUT_OF_TURN, signer_id1), (6, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), + # When manager2 joins, both of them start taking turns (7, poa.BLOCK_WEIGHT_IN_TURN, signer_id2), (8, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), (9, poa.BLOCK_WEIGHT_IN_TURN, signer_id2), (10, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), (11, poa.BLOCK_WEIGHT_IN_TURN, signer_id2), (12, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), + # manager2 leaves so manager1 produces all the next blocks (13, poa.BLOCK_WEIGHT_OUT_OF_TURN, signer_id1), (14, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), (15, poa.BLOCK_WEIGHT_OUT_OF_TURN, signer_id1), + # manager2 comes back again, so both of them take turns again (16, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), (17, poa.BLOCK_WEIGHT_IN_TURN, signer_id2), (18, poa.BLOCK_WEIGHT_IN_TURN, signer_id1), @@ -272,7 +319,7 @@ def test_existing_storage(self) -> None: manager1 = artifacts1.manager manager1.allow_mining_without_peers() - self.simulator.run(80) + self.simulator.run(50) assert manager1.tx_storage.get_block_count() == 6 _assert_height_weight_signer_id( @@ -296,7 +343,7 @@ def test_existing_storage(self) -> None: manager2 = artifacts.manager manager2.allow_mining_without_peers() - self.simulator.run(80) + self.simulator.run(60) assert manager2.tx_storage.get_block_count() == 12 _assert_height_weight_signer_id( @@ -332,7 +379,7 @@ def test_new_signer_added(self) -> None: manager_1a = artifacts_1a.manager manager_1a.allow_mining_without_peers() - self.simulator.run(80) + self.simulator.run(50) assert manager_1a.tx_storage.get_block_count() == 6 _assert_height_weight_signer_id( @@ -358,7 +405,7 @@ def test_new_signer_added(self) -> None: manager_1b = artifacts_1b.manager manager_1b.allow_mining_without_peers() - self.simulator.run(80) + self.simulator.run(90) assert manager_1b.tx_storage.get_block_count() == 11 # after we restart it, new blocks are alternating @@ -383,7 +430,7 @@ def test_new_signer_added(self) -> None: connection = FakeConnection(manager_1b, manager_2) self.simulator.add_connection(connection) - self.simulator.run(40) + self.simulator.run(60) # it should sync to the same blockchain _assert_height_weight_signer_id( @@ -463,7 +510,7 @@ def test_use_case(self) -> None: manager = self._get_manager(signer) manager.allow_mining_without_peers() - self.simulator.run(130) + self.simulator.run(100) assert manager.tx_storage.get_block_count() == 11 _assert_height_weight_signer_id(