Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,17 @@ def handle_blocks(self, blk: Block) -> None:

# Check for repeated blocks.
is_duplicated = False
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
if self._blk_repeated > self.max_repeated_blocks:
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
self._last_received_block = blk
return
if (blk_meta := self.tx_storage.get_metadata(blk.hash)) is not None:
# XXX: check whether the block is part of the best chain
if not blk_meta.voided_by:
# We reached a block we already have. Skip it.
self._blk_repeated += 1
if self._blk_repeated > self.max_repeated_blocks:
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
is_duplicated = True
self._last_received_block = blk
return

# basic linearity validation, crucial for correctly predicting the next block's height
if self._reverse:
Expand Down
67 changes: 67 additions & 0 deletions hathor_tests/p2p/test_sync_v2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import re
from typing import cast
from unittest.mock import patch

from twisted.internet.defer import Deferred, succeed
Expand All @@ -9,6 +10,8 @@
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.states import ReadyState
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient
from hathor.p2p.sync_v2.exception import StreamingError
from hathor.simulator import FakeConnection
from hathor.simulator.trigger import (
StopAfterNMinedBlocks,
Expand All @@ -17,6 +20,7 @@
StopWhenTrue,
Trigger,
)
from hathor.transaction import Block
from hathor.transaction.storage import TransactionRocksDBStorage
from hathor.transaction.storage.transaction_storage import TransactionStorage
from hathor.transaction.storage.traversal import DFSWalk
Expand Down Expand Up @@ -284,6 +288,69 @@ def test_receiving_tips_limit(self) -> None:
# and also the second node should have aborted the connection
self.assertTrue(conn12.proto2.aborting)

def test_sync_v2_reorg_stuck_on_repeated_blocks(self) -> None:
manager = self.create_peer()

dag_builder = TestDAGBuilder.from_manager(manager)
artifacts = dag_builder.build_from_str("""
blockchain genesis b[0..5]
blockchain b5 lose[1..11]
blockchain b5 win[1..12]
""")

# Load the losing chain.
for node, vertex in artifacts.list:
if node.name.startswith('lose') or node.name.startswith('b'):
cloned = vertex.clone(include_metadata=True, include_storage=False)
assert manager.vertex_handler.on_new_relayed_vertex(cloned)

# Simulate a previous partial sync by adding 10 winning blocks, but not the one that would reorg.
for i in range(1, 11):
win_blk = artifacts.get_typed_vertex(f'win{i}', Block)
cloned = win_blk.clone(include_metadata=False, include_storage=False)
assert manager.vertex_handler.on_new_relayed_vertex(cloned)
assert cloned.get_metadata().voided_by == {cloned.hash}

win11 = artifacts.get_typed_vertex('win11', Block)
win12 = artifacts.get_typed_vertex('win12', Block)
start_block = artifacts.get_typed_vertex('b5', Block)

self.assertFalse(manager.tx_storage.transaction_exists(win11.hash))
self.assertFalse(manager.tx_storage.transaction_exists(win12.hash))

start_info = _HeightInfo(height=start_block.get_height(), id=start_block.hash)
end_info = _HeightInfo(height=win12.get_height(), id=win12.hash)

class DummyProtocol:
def get_short_peer_id(self) -> str:
return 'dummy'

class DummySync:
def __init__(self) -> None:
self.protocol = DummyProtocol()
self.tx_storage = manager.tx_storage
self.vertex_handler = manager.vertex_handler

client = BlockchainStreamingClient(cast(NodeBlockSync, DummySync()), start_info, end_info)

errors: list[StreamingError] = []
client.wait().addErrback(lambda failure: errors.append(failure.value))

# Restarted stream re-sends the start block and the 10 already-downloaded winning blocks before the new ones.
stream: list[Block] = [start_block] + [
artifacts.get_typed_vertex(f'win{i}', Block) for i in range(1, 13)
]
for blk in stream:
client.handle_blocks(blk)
if errors:
break

self.assertFalse(errors, 'should stream without hitting repeated-block guard')
self.assertTrue(manager.tx_storage.transaction_exists(win11.hash))
self.assertTrue(manager.tx_storage.transaction_exists(win12.hash))
best_block = manager.tx_storage.get_best_block()
self.assertEqual(best_block.hash, win12.hash)

def _prepare_sync_v2_find_best_common_block_reorg(self) -> FakeConnection:
manager1 = self.create_peer()
manager1.allow_mining_without_peers()
Expand Down
Loading