Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hathor/consensus/poa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
calculate_weight,
get_active_signers,
get_hashed_poa_data,
in_turn_signer_index,
get_signer_index_distance,
verify_poa_signature,
)
from .poa_block_producer import PoaBlockProducer
Expand All @@ -18,7 +18,6 @@
'BLOCK_WEIGHT_OUT_OF_TURN',
'SIGNER_ID_LEN',
'get_hashed_poa_data',
'in_turn_signer_index',
'calculate_weight',
'PoaBlockProducer',
'PoaSigner',
Expand All @@ -27,4 +26,5 @@
'InvalidSignature',
'ValidSignature',
'get_active_signers',
'get_signer_index_distance',
]
15 changes: 10 additions & 5 deletions hathor/consensus/poa/poa.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,21 @@ def get_active_signers(settings: PoaSettings, height: int) -> list[bytes]:
return active_signers


def in_turn_signer_index(settings: PoaSettings, height: int) -> int:
"""Return the signer index that is in turn for the given height."""
def get_signer_index_distance(*, settings: PoaSettings, signer_index: int, height: int) -> int:
"""Considering a block height, return the signer index distance to that block. When the distance is 0, it means it
is the signer's turn."""
active_signers = get_active_signers(settings, height)
return height % len(active_signers)
expected_index = height % len(active_signers)
signers = get_active_signers(settings, height)
index_distance = (signer_index - expected_index) % len(signers)
assert 0 <= index_distance < len(signers)
return index_distance


def calculate_weight(settings: PoaSettings, block: PoaBlock, signer_index: int) -> float:
"""Return the weight for the given block and signer."""
expected_index = in_turn_signer_index(settings, block.get_height())
return BLOCK_WEIGHT_IN_TURN if expected_index == signer_index else BLOCK_WEIGHT_OUT_OF_TURN
index_distance = get_signer_index_distance(settings=settings, signer_index=signer_index, height=block.get_height())
return BLOCK_WEIGHT_IN_TURN if index_distance == 0 else BLOCK_WEIGHT_OUT_OF_TURN / index_distance


@dataclass(frozen=True, slots=True)
Expand Down
95 changes: 61 additions & 34 deletions hathor/consensus/poa/poa_block_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from hathor.consensus import poa
from hathor.consensus.consensus_settings import PoaSettings
from hathor.crypto.util import get_public_key_bytes_compressed
from hathor.pubsub import EventArguments, HathorEvents
from hathor.reactor import ReactorProtocol
from hathor.util import not_none

Expand All @@ -35,11 +36,8 @@

logger = get_logger()

# Number of seconds to wait for a sync to finish before trying to produce blocks
_WAIT_SYNC_DELAY: int = 30

# Number of seconds used between each signer depending on its distance to the expected signer
_SIGNER_TURN_INTERVAL: int = 1
_SIGNER_TURN_INTERVAL: int = 10


class PoaBlockProducer:
Expand All @@ -54,11 +52,9 @@ class PoaBlockProducer:
'_reactor',
'_manager',
'_poa_signer',
'_started_producing',
'_start_producing_lc',
'_schedule_block_lc',
'_last_seen_best_block',
'_delayed_call',
'_start_producing_lc',
)

def __init__(self, *, settings: HathorSettings, reactor: ReactorProtocol, poa_signer: PoaSigner) -> None:
Expand All @@ -70,14 +66,9 @@ def __init__(self, *, settings: HathorSettings, reactor: ReactorProtocol, poa_si
self._manager: HathorManager | None = None
self._poa_signer = poa_signer
self._last_seen_best_block: Block | None = None

self._started_producing = False
self._start_producing_lc = LoopingCall(self._start_producing)
self._start_producing_lc.clock = self._reactor

self._schedule_block_lc = LoopingCall(self._schedule_block)
self._schedule_block_lc.clock = self._reactor
self._delayed_call: IDelayedCall | None = None
self._start_producing_lc: LoopingCall = LoopingCall(self._safe_start_producing)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why we need this LoopingCall. Anyway, I'm ok to get this PR merged and get this changed later because it has no impact on mainnet and testnet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using either a LoopingCall or a recursive callLater, separate from the main callLater logic (triggered reactively via pubsub) were the only good ways I found to implement this while covering the following requirements:

  • A single full node should be able to produce blocks by itself.
  • If block production throws during the "start producing" phase, production should be tried again. However, if it fails after a call via pubsub, it should not be retried.
  • We have to consider that the call to can_start_mining() may throw an exception.

I created an issue to review this later: #1097

self._start_producing_lc.clock = self._reactor

@property
def manager(self) -> HathorManager:
Expand All @@ -89,19 +80,18 @@ def manager(self, manager: HathorManager) -> None:
self._manager = manager

def start(self) -> None:
self._start_producing_lc.start(_WAIT_SYNC_DELAY)
self._schedule_block_lc.start(self._settings.AVG_TIME_BETWEEN_BLOCKS)
self.manager.pubsub.subscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, self._on_new_vertex)
self._start_producing_lc.start(self._settings.AVG_TIME_BETWEEN_BLOCKS)

def stop(self) -> None:
if self._start_producing_lc.running:
self._start_producing_lc.stop()

if self._schedule_block_lc.running:
self._schedule_block_lc.stop()
self.manager.pubsub.unsubscribe(HathorEvents.NETWORK_NEW_TX_ACCEPTED, self._on_new_vertex)

if self._delayed_call and self._delayed_call.active():
self._delayed_call.cancel()

if self._start_producing_lc.running:
self._start_producing_lc.stop()

def _get_signer_index(self, previous_block: Block) -> int | None:
"""Return our signer index considering the active signers."""
height = previous_block.get_height() + 1
Expand All @@ -113,21 +103,49 @@ def _get_signer_index(self, previous_block: Block) -> int | None:
except ValueError:
return None

def _start_producing(self) -> None:
def _safe_start_producing(self) -> None:
try:
return self._unsafe_start_producing()
except Exception:
self._log.exception('error while trying to start block production')

def _unsafe_start_producing(self) -> None:
"""Start producing new blocks."""
if not self.manager.can_start_mining():
# We're syncing, so we'll try again later
self._log.warn('cannot start producing new blocks, node not synced')
return

self._log.info('started producing new blocks')
self._started_producing = True
self._start_producing_lc.stop()
self._schedule_block()

def _on_new_vertex(self, event: HathorEvents, args: EventArguments) -> None:
"""Handle propagation of new blocks after a vertex is received."""
assert event is HathorEvents.NETWORK_NEW_TX_ACCEPTED
block = args.tx

from hathor.transaction import Block
if not isinstance(block, Block):
return

from hathor.transaction.poa import PoaBlock
if isinstance(block, PoaBlock) and not block.weight == poa.BLOCK_WEIGHT_IN_TURN:
self._log.info('received out of turn block', block=block.hash_hex, signer_id=block.signer_id)

self._schedule_block()

def _schedule_block(self) -> None:
"""Schedule propagation of a new block."""
if not self.manager.can_start_mining():
# We're syncing, so we'll try again later
self._log.info('cannot produce new block, node not synced')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that the full node would not recover from getting out of sync after it was already synced. From recovery, I mean that this full node would stop producing blocks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full node recovers as long as it receives a block from another peer. I added an assertion in an existing test to cover this, in 0d4fea6

return

if self._start_producing_lc.running:
self._start_producing_lc.stop()

previous_block = self.manager.tx_storage.get_best_block()
if not self._started_producing or previous_block == self._last_seen_best_block:
if previous_block == self._last_seen_best_block:
return

self._last_seen_best_block = previous_block
Expand All @@ -139,7 +157,11 @@ def _schedule_block(self) -> None:
expected_timestamp = self._expected_block_timestamp(previous_block, signer_index)
propagation_delay = 0 if expected_timestamp < now else expected_timestamp - now

if self._delayed_call and self._delayed_call.active():
self._delayed_call.cancel()

self._delayed_call = self._reactor.callLater(propagation_delay, self._produce_block, previous_block)

self._log.debug(
'scheduling block production',
previous_block=previous_block.hash_hex,
Expand All @@ -153,30 +175,35 @@ def _produce_block(self, previous_block: PoaBlock) -> None:
block_templates = self.manager.get_block_templates(parent_block_hash=previous_block.hash)
block = block_templates.generate_mining_block(self.manager.rng, cls=PoaBlock)
assert isinstance(block, PoaBlock)

if block.get_height() <= self.manager.tx_storage.get_height_best_block():
return

signer_index = self._get_signer_index(previous_block)
block.weight = poa.calculate_weight(self._poa_settings, block, not_none(signer_index))
self._poa_signer.sign_block(block)
block.update_hash()

self.manager.on_new_tx(block, propagate_to_peers=False, fails_silently=False)
if not block.get_metadata().voided_by:
self.manager.connections.send_tx_to_peers(block)

self._log.debug(
self._log.info(
'produced new block',
block=block.hash_hex,
height=block.get_height(),
weight=block.weight,
parent=block.get_block_parent_hash().hex(),
voided=bool(block.get_metadata().voided_by),
)
self.manager.on_new_tx(block, propagate_to_peers=True, fails_silently=False)

def _expected_block_timestamp(self, previous_block: Block, signer_index: int) -> int:
"""Calculate the expected timestamp for a new block."""
height = previous_block.get_height() + 1
expected_index = poa.in_turn_signer_index(settings=self._poa_settings, height=height)
signers = poa.get_active_signers(self._poa_settings, height)
index_distance = (signer_index - expected_index) % len(signers)
assert 0 <= index_distance < len(signers)
index_distance = poa.get_signer_index_distance(
settings=self._poa_settings,
signer_index=signer_index,
height=height,
)
delay = _SIGNER_TURN_INTERVAL * index_distance
if index_distance > 0:
# if it's not our turn, we add a constant offset to the delay
delay += self._settings.AVG_TIME_BETWEEN_BLOCKS
return previous_block.timestamp + self._settings.AVG_TIME_BETWEEN_BLOCKS + delay
41 changes: 28 additions & 13 deletions tests/poa/test_poa.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,40 +218,55 @@ def get_signer() -> tuple[PoaSigner, bytes]:
@pytest.mark.parametrize(
['n_signers', 'height', 'signer_index', 'expected'],
[
(1, 1, 0, True),
(1, 2, 0, True),
(1, 3, 0, True),

(2, 1, 0, False),
(2, 2, 0, True),
(2, 3, 0, False),

(2, 1, 1, True),
(2, 2, 1, False),
(2, 3, 1, True),
(1, 1, 0, 0),
(1, 2, 0, 0),
(1, 3, 0, 0),

(2, 1, 0, 1),
(2, 2, 0, 0),
(2, 3, 0, 1),

(2, 1, 1, 0),
(2, 2, 1, 1),
(2, 3, 1, 0),

(5, 1, 0, 4),
(5, 2, 0, 3),
(5, 3, 0, 2),
(5, 4, 0, 1),
(5, 5, 0, 0),
]
)
def test_in_turn_signer_index(n_signers: int, height: int, signer_index: int, expected: bool) -> None:
def test_get_signer_index_distance(n_signers: int, height: int, signer_index: int, expected: int) -> None:
settings = PoaSettings.construct(signers=tuple(PoaSignerSettings(public_key=b'') for _ in range(n_signers)))

result = poa.in_turn_signer_index(settings=settings, height=height) == signer_index
result = poa.get_signer_index_distance(settings=settings, signer_index=signer_index, height=height)
assert result == expected


@pytest.mark.parametrize(
['n_signers', 'height', 'signer_index', 'expected'],
[
(1, 0, 0, poa.BLOCK_WEIGHT_IN_TURN),
(1, 1, 0, poa.BLOCK_WEIGHT_IN_TURN),
(1, 2, 0, poa.BLOCK_WEIGHT_IN_TURN),
(1, 3, 0, poa.BLOCK_WEIGHT_IN_TURN),

(2, 0, 0, poa.BLOCK_WEIGHT_IN_TURN),
(2, 1, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN),
(2, 2, 0, poa.BLOCK_WEIGHT_IN_TURN),
(2, 3, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN),

(2, 0, 1, poa.BLOCK_WEIGHT_OUT_OF_TURN),
(2, 1, 1, poa.BLOCK_WEIGHT_IN_TURN),
(2, 2, 1, poa.BLOCK_WEIGHT_OUT_OF_TURN),
(2, 3, 1, poa.BLOCK_WEIGHT_IN_TURN),

(5, 0, 0, poa.BLOCK_WEIGHT_IN_TURN),
(5, 1, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 4),
(5, 2, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 3),
(5, 3, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 2),
(5, 4, 0, poa.BLOCK_WEIGHT_OUT_OF_TURN / 1),
]
)
def test_calculate_weight(n_signers: int, height: int, signer_index: int, expected: float) -> None:
Expand Down
20 changes: 9 additions & 11 deletions tests/poa/test_poa_block_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def test_poa_block_producer_one_signer() -> None:

# when we can start mining, we start producing blocks
manager.can_start_mining = Mock(return_value=True)
reactor.advance(20)

# we produce our first block
reactor.advance(10)
Expand Down Expand Up @@ -116,7 +115,6 @@ def test_poa_block_producer_two_signers() -> None:

# when we can start mining, we start producing blocks
manager.can_start_mining = Mock(return_value=True)
reactor.advance(20)

# we produce our first block
reactor.advance(10)
Expand Down Expand Up @@ -144,14 +142,14 @@ def test_poa_block_producer_two_signers() -> None:
manager.on_new_tx.reset_mock()

# haven't produced the third block yet
reactor.advance(9)
reactor.advance(29)

# we produce our third block
reactor.advance(2)
reactor.advance(1)
manager.on_new_tx.assert_called_once()
block3 = manager.on_new_tx.call_args.args[0]
assert isinstance(block3, PoaBlock)
assert block3.timestamp == block2.timestamp + 11
assert block3.timestamp == block2.timestamp + 30
assert block3.weight == poa.BLOCK_WEIGHT_OUT_OF_TURN
assert block3.outputs == []
assert block3.get_block_parent_hash() == block2.hash
Expand All @@ -161,15 +159,15 @@ def test_poa_block_producer_two_signers() -> None:
@pytest.mark.parametrize(
['previous_height', 'signer_index', 'expected_delay'],
[
(0, 0, 33),
(0, 0, 90),
(0, 1, 30),
(0, 2, 31),
(0, 3, 32),
(0, 2, 70),
(0, 3, 80),

(1, 0, 32),
(1, 1, 33),
(1, 0, 80),
(1, 1, 90),
(1, 2, 30),
(1, 3, 31),
(1, 3, 70),
]
)
def test_expected_block_timestamp(previous_height: int, signer_index: int, expected_delay: int) -> None:
Expand Down
Loading