diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index b22faa937..732c069c0 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -18,7 +18,7 @@ import struct from collections import OrderedDict from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast +from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast from structlog import get_logger from twisted.internet.defer import Deferred, inlineCallbacks @@ -44,6 +44,20 @@ MAX_GET_TRANSACTIONS_BFS_LEN: int = 8 +class _HeightInfo(NamedTuple): + height: int + id: VertexId + + def __repr__(self): + return f'_HeightInfo({self.height}, {self.id.hex()})' + + def to_json(self) -> dict[str, Any]: + return { + 'height': self.height, + 'id': self.id.hex(), + } + + class PeerState(Enum): ERROR = 'error' UNKNOWN = 'unknown' @@ -92,16 +106,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None self.receiving_stream = False # highest block where we are synced - self.synced_height = 0 + self.synced_block: Optional[_HeightInfo] = None # highest block peer has - self.peer_height = 0 + self.peer_best_block: Optional[_HeightInfo] = None # Latest deferred waiting for a reply. self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {} self._deferred_tips: Optional[Deferred[list[bytes]]] = None - self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None - self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None + self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None + self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None # When syncing blocks we start streaming with all peers # so the moment I get some repeated blocks, I stop the download @@ -151,8 +165,8 @@ def get_status(self) -> dict[str, Any]: """ res = { 'is_enabled': self.is_sync_enabled(), - 'peer_height': self.peer_height, - 'synced_height': self.synced_height, + 'peer_best_block': self.peer_best_block.to_json() if self.peer_best_block else None, + 'synced_block': self.synced_block.to_json() if self.synced_block else None, 'synced': self._synced, 'state': self.state.value, } @@ -332,6 +346,14 @@ def run_sync_transactions(self) -> None: end_block_height=block_height) self.send_get_transactions_bfs(needed_txs, block.hash) + def get_my_best_block(self) -> _HeightInfo: + """Return my best block info.""" + bestblock = self.tx_storage.get_best_block() + assert bestblock.hash is not None + meta = bestblock.get_metadata() + assert meta.validation.is_fully_connected() + return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash) + @inlineCallbacks def run_sync_blocks(self) -> Generator[Any, Any, None]: """ Async step of the block syncing phase. @@ -339,30 +361,28 @@ def run_sync_blocks(self) -> Generator[Any, Any, None]: assert self.tx_storage.indexes is not None self.state = PeerState.SYNCING_BLOCKS - # Find my height - bestblock = self.tx_storage.get_best_block() - assert bestblock.hash is not None - meta = bestblock.get_metadata() - my_height = meta.height - - self.log.debug('run sync blocks', my_height=my_height) + # Get my best block. + my_best_block = self.get_my_best_block() - # Find best block - data = yield self.get_peer_best_block() - peer_best_block = data['block'] - peer_best_height = data['height'] - self.peer_height = peer_best_height + # Find peer's best block + self.peer_best_block = yield self.get_peer_best_block() + assert self.peer_best_block is not None # find best common block - yield self.find_best_common_block(peer_best_height, peer_best_block) - self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height) - - if self.synced_height < self.peer_height: + self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block) + assert self.synced_block is not None + self.log.debug('run_sync_blocks', + my_best_block=my_best_block, + peer_best_block=self.peer_best_block, + synced_block=self.synced_block) + + if self.synced_block.height < self.peer_best_block.height: # sync from common block - peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height]) - if peer_block_at_height: - self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height) - elif my_height == self.synced_height == self.peer_height: + self.run_block_sync(self.synced_block.id, + self.synced_block.height, + self.peer_best_block.id, + self.peer_best_block.height) + elif my_best_block.height == self.synced_block.height == self.peer_best_block.height: # we're synced and on the same height, get their mempool self.state = PeerState.SYNCING_MEMPOOL self.mempool_manager.run() @@ -494,68 +514,69 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool: return self.tx_storage.transaction_exists(vertex_id) @inlineCallbacks - def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]: + def find_best_common_block(self, + my_best_block: _HeightInfo, + peer_best_block: _HeightInfo) -> Generator[Any, Any, _HeightInfo]: """ Search for the highest block/height where we're synced. """ - assert self.tx_storage.indexes is not None - my_best_height = self.tx_storage.get_height_best_block() - - self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height) + self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block) - if peer_best_height <= my_best_height: - my_block = self.tx_storage.indexes.height.get(peer_best_height) - if my_block == peer_best_block: + if peer_best_block.height <= my_best_block.height: + assert self.tx_storage.indexes is not None + common_block_hash = self.tx_storage.indexes.height.get(peer_best_block.height) + if peer_best_block.id == common_block_hash: # we have all the peer's blocks - if peer_best_height == my_best_height: + if peer_best_block.height == my_best_block.height: # We are in sync, ask for relay so the remote sends transactions in real time self.update_synced(True) self.send_relay() else: self.update_synced(False) - - self.log.debug('synced to the latest peer block', height=peer_best_height) - self.synced_height = peer_best_height - return + self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block) + return _HeightInfo(height=peer_best_block.height, id=common_block_hash) else: - # TODO peer is on a different best chain - self.log.warn('peer on different chain', peer_height=peer_best_height, - peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else - None)) + # peer is on a different best chain + self.log.warn('peer on different chain', + peer_best_block=peer_best_block, + my_best_block=my_best_block) self.update_synced(False) - not_synced = min(peer_best_height, my_best_height) - synced = self.synced_height - - while not_synced - synced > 1: - self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced) - step = math.ceil((not_synced - synced)/10) - heights = [] - height = synced - while height < not_synced: - heights.append(height) - height += step - heights.append(not_synced) + + # Run an n-ary search in the interval [lo, hi). + # `lo` is always a height where we are synced. + # `hi` is always a height where sync state is unknown. + hi = min(peer_best_block.height, my_best_block.height) + lo = 0 + + lo_block_hash = self._settings.GENESIS_BLOCK_HASH + + while hi - lo > 1: + self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi) + step = math.ceil((hi - lo) / 10) + heights = list(range(lo, hi, step)) + heights.append(hi) + block_height_list = yield self.get_peer_block_hashes(heights) - block_height_list.reverse() + block_height_list.sort(key=lambda x: x.height, reverse=True) + for height, block_hash in block_height_list: try: # We must check only fully validated transactions. blk = self.tx_storage.get_transaction(block_hash) + except TransactionDoesNotExist: + hi = height + else: assert blk.get_metadata().validation.is_fully_connected() assert isinstance(blk, Block) - if height != blk.get_height(): - # WTF?! It should never happen. - self.state = PeerState.ERROR - return - synced = height + assert height == blk.get_height() + lo = height + lo_block_hash = block_hash break - except TransactionDoesNotExist: - not_synced = height - self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced) - self.synced_height = synced + self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi) + return _HeightInfo(height=lo, id=lo_block_hash) - def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]: + def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]: """ Returns the peer's block hashes in the given heights. """ if self._deferred_peer_block_hashes is not None: @@ -597,7 +618,7 @@ def handle_peer_block_hashes(self, payload: str) -> None: """ Handle a PEER-BLOCK-HASHES message. """ data = json.loads(payload) - data = [(h, bytes.fromhex(block_hash)) for (h, block_hash) in data] + data = [_HeightInfo(height=h, id=bytes.fromhex(block_hash)) for (h, block_hash) in data] deferred = self._deferred_peer_block_hashes self._deferred_peer_block_hashes = None if deferred: @@ -799,7 +820,7 @@ def handle_stop_block_streaming(self, payload: str) -> None: self.blockchain_streaming.stop() self.blockchain_streaming = None - def get_peer_best_block(self) -> Deferred[dict[str, Any]]: + def get_peer_best_block(self) -> Deferred[_HeightInfo]: """ Async call to get the remote peer's best block. """ if self._deferred_best_block is not None: @@ -819,6 +840,7 @@ def handle_get_best_block(self, payload: str) -> None: """ best_block = self.tx_storage.get_best_block() meta = best_block.get_metadata() + assert meta.validation.is_fully_connected() data = {'block': best_block.hash_hex, 'height': meta.height} self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data)) @@ -826,14 +848,14 @@ def handle_best_block(self, payload: str) -> None: """ Handle a BEST-BLOCK message. """ data = json.loads(payload) - assert self.protocol.connections is not None - self.log.debug('got best block', **data) - data['block'] = bytes.fromhex(data['block']) + _id = bytes.fromhex(data['block']) + height = data['height'] + best_block = _HeightInfo(height=height, id=_id) deferred = self._deferred_best_block self._deferred_best_block = None if deferred: - deferred.callback(data) + deferred.callback(best_block) def _setup_tx_streaming(self): """ Common setup before starting an outgoing transaction stream. diff --git a/tests/p2p/test_get_best_blockchain.py b/tests/p2p/test_get_best_blockchain.py index 806444be0..11e71db34 100644 --- a/tests/p2p/test_get_best_blockchain.py +++ b/tests/p2p/test_get_best_blockchain.py @@ -18,6 +18,8 @@ class BaseGetBestBlockchainTestCase(SimulatorTestCase): + seed_config = 6 + def _send_cmd(self, proto, cmd, payload=None): if not payload: line = '{}\r\n'.format(cmd) diff --git a/tests/p2p/test_sync.py b/tests/p2p/test_sync.py index ae8af2bb6..1b55adcfd 100644 --- a/tests/p2p/test_sync.py +++ b/tests/p2p/test_sync.py @@ -503,9 +503,9 @@ def test_sync_metadata(self): # check they have the same consensus node_sync1 = conn.proto1.state.sync_agent node_sync2 = conn.proto2.state.sync_agent - self.assertEqual(node_sync1.peer_height, height) - self.assertEqual(node_sync1.synced_height, height) - self.assertEqual(node_sync2.peer_height, height) + self.assertEqual(node_sync1.peer_best_block.height, height) + self.assertEqual(node_sync1.synced_block.height, height) + self.assertEqual(node_sync2.peer_best_block.height, height) # 3 genesis + blocks + 8 txs self.assertEqual(self.manager1.tx_storage.get_vertices_count(), height + 11) self.assertEqual(manager2.tx_storage.get_vertices_count(), height + 11) @@ -527,14 +527,14 @@ def test_tx_propagation_nat_peers(self): node_sync1 = self.conn1.proto1.state.sync_agent self.assertEqual(self.manager1.tx_storage.latest_timestamp, self.manager2.tx_storage.latest_timestamp) - self.assertEqual(node_sync1.peer_height, node_sync1.synced_height) - self.assertEqual(node_sync1.peer_height, self.manager1.tx_storage.get_height_best_block()) + self.assertEqual(node_sync1.peer_best_block, node_sync1.synced_block) + self.assertEqual(node_sync1.peer_best_block.height, self.manager1.tx_storage.get_height_best_block()) self.assertConsensusEqual(self.manager1, self.manager2) node_sync2 = self.conn2.proto1.state.sync_agent self.assertEqual(self.manager2.tx_storage.latest_timestamp, self.manager3.tx_storage.latest_timestamp) - self.assertEqual(node_sync2.peer_height, node_sync2.synced_height) - self.assertEqual(node_sync2.peer_height, self.manager2.tx_storage.get_height_best_block()) + self.assertEqual(node_sync2.peer_best_block, node_sync2.synced_block) + self.assertEqual(node_sync2.peer_best_block.height, self.manager2.tx_storage.get_height_best_block()) self.assertConsensusEqual(self.manager2, self.manager3) def test_block_sync_new_blocks_and_txs(self): @@ -560,8 +560,8 @@ def test_block_sync_new_blocks_and_txs(self): node_sync = conn.proto1.state.sync_agent self.assertEqual(self.manager1.tx_storage.latest_timestamp, manager2.tx_storage.latest_timestamp) - self.assertEqual(node_sync.peer_height, node_sync.synced_height) - self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block()) + self.assertEqual(node_sync.peer_best_block, node_sync.synced_block) + self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block()) self.assertConsensusEqual(self.manager1, manager2) self.assertConsensusValid(self.manager1) self.assertConsensusValid(manager2) @@ -581,8 +581,8 @@ def test_block_sync_many_new_blocks(self): self.clock.advance(1) node_sync = conn.proto1.state.sync_agent - self.assertEqual(node_sync.peer_height, node_sync.synced_height) - self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block()) + self.assertEqual(node_sync.peer_best_block, node_sync.synced_block) + self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block()) self.assertConsensusEqual(self.manager1, manager2) self.assertConsensusValid(self.manager1) self.assertConsensusValid(manager2) @@ -602,8 +602,8 @@ def test_block_sync_new_blocks(self): self.clock.advance(1) node_sync = conn.proto1.state.sync_agent - self.assertEqual(node_sync.peer_height, node_sync.synced_height) - self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block()) + self.assertEqual(node_sync.peer_best_block, node_sync.synced_block) + self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block()) self.assertConsensusEqual(self.manager1, manager2) self.assertConsensusValid(self.manager1) self.assertConsensusValid(manager2) @@ -664,9 +664,9 @@ def test_full_sync(self): node_sync1 = conn.proto1.state.sync_agent node_sync2 = conn.proto2.state.sync_agent - self.assertEqual(node_sync1.peer_height, common_height) - self.assertEqual(node_sync1.synced_height, common_height) - self.assertEqual(node_sync2.peer_height, common_height) + self.assertEqual(node_sync1.peer_best_block.height, common_height) + self.assertEqual(node_sync1.synced_block.height, common_height) + self.assertEqual(node_sync2.peer_best_block.height, common_height) self.assertConsensusValid(self.manager1) self.assertConsensusValid(manager2) self.assertConsensusEqual(self.manager1, manager2) @@ -715,9 +715,9 @@ def test_block_sync_checkpoints(self): node_sync1 = conn.proto1.state.sync_agent node_sync2 = conn.proto2.state.sync_agent - self.assertEqual(node_sync1.peer_height, TOTAL_BLOCKS) - self.assertEqual(node_sync1.synced_height, TOTAL_BLOCKS) - self.assertEqual(node_sync2.peer_height, len(blocks)) + self.assertEqual(node_sync1.peer_best_block.height, TOTAL_BLOCKS) + self.assertEqual(node_sync1.synced_block.height, TOTAL_BLOCKS) + self.assertEqual(node_sync2.peer_best_block.height, len(blocks)) self.assertConsensusValid(self.manager1) self.assertConsensusValid(manager2) @@ -738,8 +738,8 @@ def test_block_sync_only_genesis(self): self.clock.advance(1) node_sync = conn.proto1.state.sync_agent - self.assertEqual(node_sync.synced_height, 0) - self.assertEqual(node_sync.peer_height, 0) + self.assertEqual(node_sync.synced_block.height, 0) + self.assertEqual(node_sync.peer_best_block.height, 0) self.assertEqual(self.manager1.tx_storage.get_vertices_count(), 3) self.assertEqual(manager2.tx_storage.get_vertices_count(), 3) diff --git a/tests/simulation/test_simulator.py b/tests/simulation/test_simulator.py index 20c8b050f..0df5dfdfe 100644 --- a/tests/simulation/test_simulator.py +++ b/tests/simulation/test_simulator.py @@ -157,15 +157,11 @@ class SyncV1RandomSimulatorTestCase(unittest.SyncV1Params, BaseRandomSimulatorTe class SyncV2RandomSimulatorTestCase(unittest.SyncV2Params, BaseRandomSimulatorTestCase): __test__ = True - seed_config = 3 - # sync-bridge should behave like sync-v2 class SyncBridgeRandomSimulatorTestCase(unittest.SyncBridgeParams, SyncV2RandomSimulatorTestCase): __test__ = True - seed_config = 4 - def test_compare_mempool_implementations(self): manager1 = self.create_peer() manager2 = self.create_peer() diff --git a/tests/unittest.py b/tests/unittest.py index 8231f3d5a..f8c941ca7 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -10,7 +10,6 @@ from twisted.trial import unittest from hathor.builder import BuildArtifacts, Builder -from hathor.cli.util import setup_logging from hathor.conf import HathorSettings from hathor.conf.get_settings import get_settings from hathor.daa import TestMode, _set_test_mode @@ -105,7 +104,6 @@ class TestCase(unittest.TestCase): seed_config: Optional[int] = None def setUp(self): - setup_logging() _set_test_mode(TestMode.TEST_ALL_WEIGHT) self.tmpdirs = [] self.clock = TestMemoryReactorClock() @@ -473,7 +471,7 @@ def assertV1SyncedProgress(self, node_sync): self.assertEqual(node_sync.synced_timestamp, node_sync.peer_timestamp) def assertV2SyncedProgress(self, node_sync): - self.assertEqual(node_sync.synced_height, node_sync.peer_height) + self.assertEqual(node_sync.synced_block, node_sync.peer_best_block) def clean_tmpdirs(self): for tmpdir in self.tmpdirs: