Skip to content

Commit

Permalink
add test performing a deep reorg with 3 nodes, along with various fix…
Browse files Browse the repository at this point in the history
…es to make it work
  • Loading branch information
arvidn committed Nov 5, 2023
1 parent eea6561 commit 00827d0
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 4 deletions.
12 changes: 9 additions & 3 deletions chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,13 +735,19 @@ def clean_block_record(self, height: int) -> None:
Args:
height: Minimum height that we need to keep in the cache
"""
if self._peak_height is not None and height > self._peak_height - self.constants.BLOCKS_CACHE_SIZE:
height = self._peak_height - self.constants.BLOCKS_CACHE_SIZE
if height < 0:
return None
ses_heights = self.get_ses_heights()[-4:]
blocks_to_remove = self.__heights_in_cache.get(uint32(height), None)
while blocks_to_remove is not None and height >= 0:
for header_hash in blocks_to_remove:
del self.__block_records[header_hash] # remove from blocks
del self.__heights_in_cache[uint32(height)] # remove height from heights in cache
if height not in ses_heights:
for header_hash in blocks_to_remove:
# remove from blocks
del self.__block_records[header_hash]
# remove height from heights in cache
del self.__heights_in_cache[uint32(height)]

if height == 0:
break
Expand Down
12 changes: 11 additions & 1 deletion chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
)
if first is None or not isinstance(first, full_node_protocol.RespondBlock):
self.sync_store.batch_syncing.remove(peer.peer_node_id)
raise ValueError(f"Error short batch syncing, could not fetch block at height {start_height}")
self.log.error(f"Error short batch syncing, could not fetch block at height {start_height}")
return False
if not self.blockchain.contains_block(first.block.prev_header_hash):
self.log.info("Batch syncing stopped, this is a deep chain")
self.sync_store.batch_syncing.remove(peer.peer_node_id)
Expand Down Expand Up @@ -716,6 +717,10 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec
return None

if request.height < curr_peak_height + self.config["sync_blocks_behind_threshold"]:
# TODO: We get here if we encountered a heavier peak with a
# lower height than ours. We don't seem to handle this case
# right now. This ends up requesting the block at *our* peak
# height.
# This case of being behind but not by so much
if await self.short_sync_batch(peer, uint32(max(curr_peak_height - 6, 0)), request.height):
return None
Expand Down Expand Up @@ -1110,6 +1115,11 @@ async def validate_block_batches(
self.subscriptions.has_ph_subscription,
)
await self.hint_store.add_hints(hints_to_add)
# Note that end_height is not necessarily the peak at this
# point. In case of a re-org, it may even be significantly
# higher than _peak_height, and still not be the peak.
# clean_block_record() will not necessarily honor this cut-off
# height, in that case.
self.blockchain.clean_block_record(end_height - self.constants.BLOCKS_CACHE_SIZE)

batch_queue_input: asyncio.Queue[Optional[Tuple[WSChiaConnection, List[FullBlock]]]] = asyncio.Queue(
Expand Down
96 changes: 96 additions & 0 deletions tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import random
import time
import traceback
from typing import Coroutine, Dict, List, Optional, Tuple

import pytest
Expand Down Expand Up @@ -2150,3 +2151,98 @@ async def test_wallet_sync_task_failure(
assert "update_wallets - fork_height: 10, peak_height: 0" in caplog.text
assert "Wallet sync task failure" not in caplog.text
assert not full_node.wallet_sync_task.done()


@pytest.mark.anyio
@pytest.mark.limit_consensus_modes(reason="this test is too slow (for now)")
async def test_long_reorg_nodes(
three_nodes,
default_10000_blocks: List[FullBlock],
test_long_reorg_blocks: List[FullBlock],
self_hostname: str,
seeded_random: random.Random,
):
full_node_1, full_node_2, full_node_3 = three_nodes

blocks = default_10000_blocks[:1600]

reorg_blocks = test_long_reorg_blocks[:1200]

# full node 1 has the original chain
for block_batch in to_batches(blocks, 64):
b = block_batch.entries[0]
if (b.height % 128) == 0:
print(f"main chain: {b.height:4} weight: {b.weight}")
await full_node_1.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None)

# full node 2 has the reorg-chain
for block_batch in to_batches(reorg_blocks[:-1], 64):
b = block_batch.entries[0]
if (b.height % 128) == 0:
print(f"reorg chain: {b.height:4} weight: {b.weight}")
await full_node_2.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None)

await connect_and_get_peer(full_node_1.full_node.server, full_node_2.full_node.server, self_hostname)

# TODO: There appears to be an issue where the node with the lighter chain
# fails to initiate the reorg until there's a new block farmed onto the
# heavier chain.
await full_node_2.full_node.add_block(reorg_blocks[-1])

def check_nodes_in_sync():
try:
p1 = full_node_2.full_node.blockchain.get_peak()
p2 = full_node_1.full_node.blockchain.get_peak()
return p1 == p2
except Exception as e:
# TODO: understand why we get an exception here sometimes. Fix it or
# add comment explaining why we need to catch here
traceback.print_exc()
print(f"e: {e}")
return False

await time_out_assert(120, check_nodes_in_sync)
peak = full_node_2.full_node.blockchain.get_peak()
print(f"peak: {str(peak.header_hash)[:6]}")

p1 = full_node_1.full_node.blockchain.get_peak()
p2 = full_node_2.full_node.blockchain.get_peak()

assert p1.header_hash == reorg_blocks[-1].header_hash
assert p2.header_hash == reorg_blocks[-1].header_hash

blocks = default_10000_blocks[:4000]

# full node 3 has the original chain, but even longer
for block_batch in to_batches(blocks, 64):
b = block_batch.entries[0]
if (b.height % 128) == 0:
print(f"main chain: {b.height:4} weight: {b.weight}")
await full_node_3.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None)

print("connecting node 3")
await connect_and_get_peer(full_node_3.full_node.server, full_node_1.full_node.server, self_hostname)
# await connect_and_get_peer(full_node_3.full_node.server, full_node_2.full_node.server, self_hostname)

def check_nodes_in_sync2():
try:
p1 = full_node_1.full_node.blockchain.get_peak()
# p2 = full_node_2.full_node.blockchain.get_peak()
p3 = full_node_3.full_node.blockchain.get_peak()
return p1.header_hash == p3.header_hash
except Exception as e:
# TODO: understand why we get an exception here sometimes. Fix it or
# add comment explaining why we need to catch here
traceback.print_exc()
print(f"e: {e}")
return False

await time_out_assert(950, check_nodes_in_sync2)

p1 = full_node_1.full_node.blockchain.get_peak()
# p2 = full_node_2.full_node.blockchain.get_peak()
p3 = full_node_3.full_node.blockchain.get_peak()

assert p1.header_hash == blocks[-1].header_hash
# assert p2.header_hash == blocks[-1].header_hash
assert p3.header_hash == blocks[-1].header_hash

0 comments on commit 00827d0

Please sign in to comment.