Skip to content

Commit

Permalink
add test performing a deep reorg with 3 nodes, along with various fix…
Browse files Browse the repository at this point in the history
…es to make it work
  • Loading branch information
arvidn committed Nov 4, 2023
1 parent c908b25 commit f96c98d
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 12 deletions.
14 changes: 11 additions & 3 deletions chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,13 +735,21 @@ def clean_block_record(self, height: int) -> None:
Args:
height: Minimum height that we need to keep in the cache
"""
if self._peak_height is not None and height > self._peak_height:
height = self._peak_height - self.constants.BLOCKS_CACHE_SIZE
if height < 0:
return None
ses_heights = self.get_ses_heights()[-3:]
blocks_to_remove = self.__heights_in_cache.get(uint32(height), None)
while blocks_to_remove is not None and height >= 0:
for header_hash in blocks_to_remove:
del self.__block_records[header_hash] # remove from blocks
del self.__heights_in_cache[uint32(height)] # remove height from heights in cache
# we need to preserve the recent sub-epoch summary blocks in the
# cache, otherwise we fail to create sub epoch segments
if height not in ses_heights:
for header_hash in blocks_to_remove:
# remove from blocks
del self.__block_records[header_hash]
# remove height from heights in cache
del self.__heights_in_cache[uint32(height)]

if height == 0:
break
Expand Down
9 changes: 7 additions & 2 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
)
if first is None or not isinstance(first, full_node_protocol.RespondBlock):
self.sync_store.batch_syncing.remove(peer.peer_node_id)
raise ValueError(f"Error short batch syncing, could not fetch block at height {start_height}")
self.log.error(f"Error short batch syncing, could not fetch block at height {start_height}")
return False
if not self.blockchain.contains_block(first.block.prev_header_hash):
self.log.info("Batch syncing stopped, this is a deep chain")
self.sync_store.batch_syncing.remove(peer.peer_node_id)
Expand Down Expand Up @@ -913,10 +914,11 @@ async def _sync(self) -> None:
self.log.debug("long sync started")
try:
self.log.info("Starting to perform sync.")
self.log.info("Waiting to receive peaks from peers.")

# Wait until we have 3 peaks or up to a max of 30 seconds
max_iterations = int(self.config.get("max_sync_wait", 30)) * 10

self.log.info(f"Waiting to receive peaks from peers. (timeout: {max_iterations/10}s)")
peaks = []
for i in range(max_iterations):
peaks = [peak.header_hash for peak in self.sync_store.get_peak_of_each_peer().values()]
Expand Down Expand Up @@ -1109,6 +1111,9 @@ async def validate_block_batches(
self.subscriptions.has_ph_subscription,
)
await self.hint_store.add_hints(hints_to_add)
# TODO: this logic is a bit problematic. It assumes that
# end_height is also the peak. But we call this function during
# a reorg as well, where the blocks are not (yet) the main chain
self.blockchain.clean_block_record(end_height - self.constants.BLOCKS_CACHE_SIZE)

batch_queue_input: asyncio.Queue[Optional[Tuple[WSChiaConnection, List[FullBlock]]]] = asyncio.Queue(
Expand Down
27 changes: 20 additions & 7 deletions chia/simulator/setup_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ async def setup_two_nodes(
Setup and teardown of two full nodes, with blockchains and separate DBs.
"""

config_overrides = {"full_node.max_sync_wait": 0}
with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2:
bt1 = await create_block_tools_async(constants=consensus_constants, keychain=keychain1)
bt1 = await create_block_tools_async(
constants=consensus_constants, keychain=keychain1, config_overrides=config_overrides
)
async with setup_full_node(
consensus_constants,
"blockchain_test.db",
Expand All @@ -91,7 +94,9 @@ async def setup_two_nodes(
consensus_constants,
"blockchain_test_2.db",
self_hostname,
await create_block_tools_async(constants=consensus_constants, keychain=keychain2),
await create_block_tools_async(
constants=consensus_constants, keychain=keychain2, config_overrides=config_overrides
),
simulator=False,
db_version=db_version,
) as service2:
Expand All @@ -108,6 +113,7 @@ async def setup_n_nodes(
"""
Setup and teardown of n full nodes, with blockchains and separate DBs.
"""
config_overrides = {"full_node.max_sync_wait": 0}
with ExitStack() as stack:
keychains = [stack.enter_context(TempKeyring(populate=True)) for _ in range(n)]
async with AsyncExitStack() as async_exit_stack:
Expand All @@ -117,7 +123,9 @@ async def setup_n_nodes(
consensus_constants,
f"blockchain_test_{i}.db",
self_hostname,
await create_block_tools_async(constants=consensus_constants, keychain=keychain),
await create_block_tools_async(
constants=consensus_constants, keychain=keychain, config_overrides=config_overrides
),
simulator=False,
db_version=db_version,
)
Expand Down Expand Up @@ -145,8 +153,6 @@ async def setup_simulators_and_wallets(
with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2:
if config_overrides is None:
config_overrides = {}
if "full_node.max_sync_wait" not in config_overrides:
config_overrides["full_node.max_sync_wait"] = 1
async with setup_simulators_and_wallets_inner(
db_version,
consensus_constants,
Expand Down Expand Up @@ -223,6 +229,8 @@ async def setup_simulators_and_wallets_inner(
) -> AsyncIterator[
Tuple[List[BlockTools], List[Service[FullNode, FullNodeSimulator]], List[Service[WalletNode, WalletNodeAPI]]]
]:
if config_overrides is not None and "full_node.max_sync_wait" not in config_overrides:
config_overrides["full_node.max_sync_wait"] = 0
async with AsyncExitStack() as async_exit_stack:
bt_tools: List[BlockTools] = [
await create_block_tools_async(consensus_constants, keychain=keychain1, config_overrides=config_overrides)
Expand Down Expand Up @@ -337,10 +345,15 @@ async def setup_full_system_inner(
keychain2: Keychain,
shared_b_tools: BlockTools,
) -> AsyncIterator[FullSystem]:
config_overrides = {"full_node.max_sync_wait": 0}
if b_tools is None:
b_tools = await create_block_tools_async(constants=consensus_constants, keychain=keychain1)
b_tools = await create_block_tools_async(
constants=consensus_constants, keychain=keychain1, config_overrides=config_overrides
)
if b_tools_1 is None:
b_tools_1 = await create_block_tools_async(constants=consensus_constants, keychain=keychain2)
b_tools_1 = await create_block_tools_async(
constants=consensus_constants, keychain=keychain2, config_overrides=config_overrides
)

self_hostname = shared_b_tools.config["self_hostname"]

Expand Down
94 changes: 94 additions & 0 deletions tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2150,3 +2150,97 @@ async def test_wallet_sync_task_failure(
assert "update_wallets - fork_height: 10, peak_height: 0" in caplog.text
assert "Wallet sync task failure" not in caplog.text
assert not full_node.wallet_sync_task.done()


@pytest.mark.asyncio
@pytest.mark.limit_consensus_modes(reason="this test is too slow (for now)")
async def test_long_reorg_nodes(
three_nodes,
default_10000_blocks: List[FullBlock],
test_long_reorg_blocks: List[FullBlock],
self_hostname: str,
seeded_random: random.Random,
):
full_node_1, full_node_2, full_node_3 = three_nodes

blocks = default_10000_blocks[:1600]

reorg_blocks = test_long_reorg_blocks[:1200]

# full node 1 has the original chain
for block_batch in to_batches(blocks, 64):
b = block_batch.entries[0]
if (b.height % 128) == 0:
print(f"main chain: {b.height:4} weight: {b.weight}")
await full_node_1.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None)

# full node 2 has the reorg-chain
for block_batch in to_batches(reorg_blocks[:-1], 64):
b = block_batch.entries[0]
if (b.height % 128) == 0:
print(f"reorg chain: {b.height:4} weight: {b.weight}")
await full_node_2.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None)

await connect_and_get_peer(full_node_1.full_node.server, full_node_2.full_node.server, self_hostname)

# TODO: There appears to be an issue where the node with the lighter chain
# fails to initiate the reorg until there's a new block farmed onto the
# heavier chain.
await full_node_2.full_node.add_block(reorg_blocks[-1])

def check_nodes_in_sync():
try:
p1 = full_node_2.full_node.blockchain.get_peak()
p2 = full_node_1.full_node.blockchain.get_peak()
return p1 == p2
except Exception as e:
print(f"e: {e}")
return False

await time_out_assert(120, check_nodes_in_sync)
peak = full_node_2.full_node.blockchain.get_peak()
print(f"peak: {str(peak.header_hash)[:6]}")

p1 = full_node_1.full_node.blockchain.get_peak()
p2 = full_node_2.full_node.blockchain.get_peak()

assert p1.header_hash == reorg_blocks[-1].header_hash
assert p2.header_hash == reorg_blocks[-1].header_hash

blocks = default_10000_blocks[:4000]

# full node 3 has the original chain, but even longer
for block_batch in to_batches(blocks, 64):
b = block_batch.entries[0]
if (b.height % 128) == 0:
print(f"main chain: {b.height:4} weight: {b.weight}")
await full_node_3.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None)

print("connecting node 3")
await connect_and_get_peer(full_node_3.full_node.server, full_node_1.full_node.server, self_hostname)
# await connect_and_get_peer(full_node_3.full_node.server, full_node_2.full_node.server, self_hostname)

def check_nodes_in_sync2():
try:
p1 = full_node_1.full_node.blockchain.get_peak()
# p2 = full_node_2.full_node.blockchain.get_peak()
p3 = full_node_3.full_node.blockchain.get_peak()
if p1.height > 3500 or (p1.height % 100) == 0:
print(f"1: {p1.height} {str(p1.header_hash)[:6]} ", end="")
# print(f"2: {p2.height} {str(p2.header_hash)[:6]} ", end="")
print(f"3: {p3.height} {str(p3.header_hash)[:6]}")
# return p1 == p2 and p1 == p3
return p1.header_hash == p3.header_hash
except Exception as e:
print(f"e: {e}")
return False

await time_out_assert(900, check_nodes_in_sync2)

p1 = full_node_1.full_node.blockchain.get_peak()
# p2 = full_node_2.full_node.blockchain.get_peak()
p3 = full_node_3.full_node.blockchain.get_peak()

assert p1.header_hash == blocks[-1].header_hash
# assert p2.header_hash == blocks[-1].header_hash
assert p3.header_hash == blocks[-1].header_hash

0 comments on commit f96c98d

Please sign in to comment.