Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 95 additions & 73 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import struct
from collections import OrderedDict
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast
from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast

from structlog import get_logger
from twisted.internet.defer import Deferred, inlineCallbacks
Expand All @@ -44,6 +44,20 @@
MAX_GET_TRANSACTIONS_BFS_LEN: int = 8


class _HeightInfo(NamedTuple):
height: int
id: VertexId

def __repr__(self):
return f'_HeightInfo({self.height}, {self.id.hex()})'

def to_json(self) -> dict[str, Any]:
return {
'height': self.height,
'id': self.id.hex(),
}


class PeerState(Enum):
ERROR = 'error'
UNKNOWN = 'unknown'
Expand Down Expand Up @@ -92,16 +106,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
self.receiving_stream = False

# highest block where we are synced
self.synced_height = 0
self.synced_block: Optional[_HeightInfo] = None

# highest block peer has
self.peer_height = 0
self.peer_best_block: Optional[_HeightInfo] = None

# Latest deferred waiting for a reply.
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None
self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
Expand Down Expand Up @@ -151,8 +165,8 @@ def get_status(self) -> dict[str, Any]:
"""
res = {
'is_enabled': self.is_sync_enabled(),
'peer_height': self.peer_height,
'synced_height': self.synced_height,
'peer_best_block': self.peer_best_block.to_json() if self.peer_best_block else None,
'synced_block': self.synced_block.to_json() if self.synced_block else None,
'synced': self._synced,
'state': self.state.value,
}
Expand Down Expand Up @@ -332,37 +346,43 @@ def run_sync_transactions(self) -> None:
end_block_height=block_height)
self.send_get_transactions_bfs(needed_txs, block.hash)

def get_my_best_block(self) -> _HeightInfo:
"""Return my best block info."""
bestblock = self.tx_storage.get_best_block()
assert bestblock.hash is not None
meta = bestblock.get_metadata()
assert meta.validation.is_fully_connected()
return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash)

@inlineCallbacks
def run_sync_blocks(self) -> Generator[Any, Any, None]:
""" Async step of the block syncing phase.
"""
assert self.tx_storage.indexes is not None
self.state = PeerState.SYNCING_BLOCKS

# Find my height
bestblock = self.tx_storage.get_best_block()
assert bestblock.hash is not None
meta = bestblock.get_metadata()
my_height = meta.height

self.log.debug('run sync blocks', my_height=my_height)
# Get my best block.
my_best_block = self.get_my_best_block()

# Find best block
data = yield self.get_peer_best_block()
peer_best_block = data['block']
peer_best_height = data['height']
self.peer_height = peer_best_height
# Find peer's best block
self.peer_best_block = yield self.get_peer_best_block()
assert self.peer_best_block is not None

# find best common block
yield self.find_best_common_block(peer_best_height, peer_best_block)
self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height)

if self.synced_height < self.peer_height:
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
assert self.synced_block is not None
self.log.debug('run_sync_blocks',
my_best_block=my_best_block,
peer_best_block=self.peer_best_block,
synced_block=self.synced_block)

if self.synced_block.height < self.peer_best_block.height:
# sync from common block
peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height])
if peer_block_at_height:
self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height)
elif my_height == self.synced_height == self.peer_height:
self.run_block_sync(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)
elif my_best_block.height == self.synced_block.height == self.peer_best_block.height:
# we're synced and on the same height, get their mempool
self.state = PeerState.SYNCING_MEMPOOL
self.mempool_manager.run()
Expand Down Expand Up @@ -494,68 +514,69 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
return self.tx_storage.transaction_exists(vertex_id)

@inlineCallbacks
def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]:
def find_best_common_block(self,
my_best_block: _HeightInfo,
peer_best_block: _HeightInfo) -> Generator[Any, Any, _HeightInfo]:
""" Search for the highest block/height where we're synced.
"""
assert self.tx_storage.indexes is not None
my_best_height = self.tx_storage.get_height_best_block()

self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height)
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)

if peer_best_height <= my_best_height:
my_block = self.tx_storage.indexes.height.get(peer_best_height)
if my_block == peer_best_block:
if peer_best_block.height <= my_best_block.height:
assert self.tx_storage.indexes is not None
common_block_hash = self.tx_storage.indexes.height.get(peer_best_block.height)
if peer_best_block.id == common_block_hash:
# we have all the peer's blocks
if peer_best_height == my_best_height:
if peer_best_block.height == my_best_block.height:
# We are in sync, ask for relay so the remote sends transactions in real time
self.update_synced(True)
self.send_relay()
else:
self.update_synced(False)

self.log.debug('synced to the latest peer block', height=peer_best_height)
self.synced_height = peer_best_height
return
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
return _HeightInfo(height=peer_best_block.height, id=common_block_hash)
else:
# TODO peer is on a different best chain
self.log.warn('peer on different chain', peer_height=peer_best_height,
peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else
None))
# peer is on a different best chain
self.log.warn('peer on different chain',
peer_best_block=peer_best_block,
my_best_block=my_best_block)

self.update_synced(False)
not_synced = min(peer_best_height, my_best_height)
synced = self.synced_height

while not_synced - synced > 1:
self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
step = math.ceil((not_synced - synced)/10)
heights = []
height = synced
while height < not_synced:
heights.append(height)
height += step
heights.append(not_synced)

# Run an n-ary search in the interval [lo, hi).
# `lo` is always a height where we are synced.
# `hi` is always a height where sync state is unknown.
hi = min(peer_best_block.height, my_best_block.height)
lo = 0

lo_block_hash = self._settings.GENESIS_BLOCK_HASH

while hi - lo > 1:
self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi)
step = math.ceil((hi - lo) / 10)
heights = list(range(lo, hi, step))
heights.append(hi)

block_height_list = yield self.get_peer_block_hashes(heights)
block_height_list.reverse()
block_height_list.sort(key=lambda x: x.height, reverse=True)

for height, block_hash in block_height_list:
try:
# We must check only fully validated transactions.
blk = self.tx_storage.get_transaction(block_hash)
except TransactionDoesNotExist:
hi = height
else:
assert blk.get_metadata().validation.is_fully_connected()
assert isinstance(blk, Block)
if height != blk.get_height():
# WTF?! It should never happen.
self.state = PeerState.ERROR
return
synced = height
assert height == blk.get_height()
lo = height
lo_block_hash = block_hash
break
except TransactionDoesNotExist:
not_synced = height

self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced)
self.synced_height = synced
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
return _HeightInfo(height=lo, id=lo_block_hash)

def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
""" Returns the peer's block hashes in the given heights.
"""
if self._deferred_peer_block_hashes is not None:
Expand Down Expand Up @@ -597,7 +618,7 @@ def handle_peer_block_hashes(self, payload: str) -> None:
""" Handle a PEER-BLOCK-HASHES message.
"""
data = json.loads(payload)
data = [(h, bytes.fromhex(block_hash)) for (h, block_hash) in data]
data = [_HeightInfo(height=h, id=bytes.fromhex(block_hash)) for (h, block_hash) in data]
deferred = self._deferred_peer_block_hashes
self._deferred_peer_block_hashes = None
if deferred:
Expand Down Expand Up @@ -799,7 +820,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
self.blockchain_streaming.stop()
self.blockchain_streaming = None

def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
def get_peer_best_block(self) -> Deferred[_HeightInfo]:
""" Async call to get the remote peer's best block.
"""
if self._deferred_best_block is not None:
Expand All @@ -819,21 +840,22 @@ def handle_get_best_block(self, payload: str) -> None:
"""
best_block = self.tx_storage.get_best_block()
meta = best_block.get_metadata()
assert meta.validation.is_fully_connected()
data = {'block': best_block.hash_hex, 'height': meta.height}
self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))

def handle_best_block(self, payload: str) -> None:
""" Handle a BEST-BLOCK message.
"""
data = json.loads(payload)
assert self.protocol.connections is not None
self.log.debug('got best block', **data)
data['block'] = bytes.fromhex(data['block'])
_id = bytes.fromhex(data['block'])
height = data['height']
best_block = _HeightInfo(height=height, id=_id)

deferred = self._deferred_best_block
self._deferred_best_block = None
if deferred:
deferred.callback(data)
deferred.callback(best_block)

def _setup_tx_streaming(self):
""" Common setup before starting an outgoing transaction stream.
Expand Down
2 changes: 2 additions & 0 deletions tests/p2p/test_get_best_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

class BaseGetBestBlockchainTestCase(SimulatorTestCase):

seed_config = 6

def _send_cmd(self, proto, cmd, payload=None):
if not payload:
line = '{}\r\n'.format(cmd)
Expand Down
42 changes: 21 additions & 21 deletions tests/p2p/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,9 @@ def test_sync_metadata(self):
# check they have the same consensus
node_sync1 = conn.proto1.state.sync_agent
node_sync2 = conn.proto2.state.sync_agent
self.assertEqual(node_sync1.peer_height, height)
self.assertEqual(node_sync1.synced_height, height)
self.assertEqual(node_sync2.peer_height, height)
self.assertEqual(node_sync1.peer_best_block.height, height)
self.assertEqual(node_sync1.synced_block.height, height)
self.assertEqual(node_sync2.peer_best_block.height, height)
# 3 genesis + blocks + 8 txs
self.assertEqual(self.manager1.tx_storage.get_vertices_count(), height + 11)
self.assertEqual(manager2.tx_storage.get_vertices_count(), height + 11)
Expand All @@ -527,14 +527,14 @@ def test_tx_propagation_nat_peers(self):

node_sync1 = self.conn1.proto1.state.sync_agent
self.assertEqual(self.manager1.tx_storage.latest_timestamp, self.manager2.tx_storage.latest_timestamp)
self.assertEqual(node_sync1.peer_height, node_sync1.synced_height)
self.assertEqual(node_sync1.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync1.peer_best_block, node_sync1.synced_block)
self.assertEqual(node_sync1.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, self.manager2)

node_sync2 = self.conn2.proto1.state.sync_agent
self.assertEqual(self.manager2.tx_storage.latest_timestamp, self.manager3.tx_storage.latest_timestamp)
self.assertEqual(node_sync2.peer_height, node_sync2.synced_height)
self.assertEqual(node_sync2.peer_height, self.manager2.tx_storage.get_height_best_block())
self.assertEqual(node_sync2.peer_best_block, node_sync2.synced_block)
self.assertEqual(node_sync2.peer_best_block.height, self.manager2.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager2, self.manager3)

def test_block_sync_new_blocks_and_txs(self):
Expand All @@ -560,8 +560,8 @@ def test_block_sync_new_blocks_and_txs(self):

node_sync = conn.proto1.state.sync_agent
self.assertEqual(self.manager1.tx_storage.latest_timestamp, manager2.tx_storage.latest_timestamp)
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, manager2)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
Expand All @@ -581,8 +581,8 @@ def test_block_sync_many_new_blocks(self):
self.clock.advance(1)

node_sync = conn.proto1.state.sync_agent
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, manager2)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
Expand All @@ -602,8 +602,8 @@ def test_block_sync_new_blocks(self):
self.clock.advance(1)

node_sync = conn.proto1.state.sync_agent
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, manager2)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
Expand Down Expand Up @@ -664,9 +664,9 @@ def test_full_sync(self):

node_sync1 = conn.proto1.state.sync_agent
node_sync2 = conn.proto2.state.sync_agent
self.assertEqual(node_sync1.peer_height, common_height)
self.assertEqual(node_sync1.synced_height, common_height)
self.assertEqual(node_sync2.peer_height, common_height)
self.assertEqual(node_sync1.peer_best_block.height, common_height)
self.assertEqual(node_sync1.synced_block.height, common_height)
self.assertEqual(node_sync2.peer_best_block.height, common_height)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
self.assertConsensusEqual(self.manager1, manager2)
Expand Down Expand Up @@ -715,9 +715,9 @@ def test_block_sync_checkpoints(self):
node_sync1 = conn.proto1.state.sync_agent
node_sync2 = conn.proto2.state.sync_agent

self.assertEqual(node_sync1.peer_height, TOTAL_BLOCKS)
self.assertEqual(node_sync1.synced_height, TOTAL_BLOCKS)
self.assertEqual(node_sync2.peer_height, len(blocks))
self.assertEqual(node_sync1.peer_best_block.height, TOTAL_BLOCKS)
self.assertEqual(node_sync1.synced_block.height, TOTAL_BLOCKS)
self.assertEqual(node_sync2.peer_best_block.height, len(blocks))
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)

Expand All @@ -738,8 +738,8 @@ def test_block_sync_only_genesis(self):
self.clock.advance(1)

node_sync = conn.proto1.state.sync_agent
self.assertEqual(node_sync.synced_height, 0)
self.assertEqual(node_sync.peer_height, 0)
self.assertEqual(node_sync.synced_block.height, 0)
self.assertEqual(node_sync.peer_best_block.height, 0)

self.assertEqual(self.manager1.tx_storage.get_vertices_count(), 3)
self.assertEqual(manager2.tx_storage.get_vertices_count(), 3)
Expand Down
Loading