From 6edaf89265e7da047a8faba6c1092ce88285883e Mon Sep 17 00:00:00 2001 From: arvidn Date: Fri, 3 Nov 2023 22:28:50 +0100 Subject: [PATCH] add test performing a deep reorg with 3 nodes, along with various fixes to make it work --- chia/consensus/blockchain.py | 14 +++- chia/full_node/full_node.py | 9 ++- chia/simulator/setup_nodes.py | 27 ++++++-- tests/core/full_node/test_full_node.py | 89 ++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 12 deletions(-) diff --git a/chia/consensus/blockchain.py b/chia/consensus/blockchain.py index 8c0150127944..93ff37ca1bdb 100644 --- a/chia/consensus/blockchain.py +++ b/chia/consensus/blockchain.py @@ -735,13 +735,21 @@ def clean_block_record(self, height: int) -> None: Args: height: Minimum height that we need to keep in the cache """ + if self._peak_height is not None and height > self._peak_height: + height = self._peak_height - self.constants.BLOCKS_CACHE_SIZE if height < 0: return None + ses_heights = self.get_ses_heights()[-3:] blocks_to_remove = self.__heights_in_cache.get(uint32(height), None) while blocks_to_remove is not None and height >= 0: - for header_hash in blocks_to_remove: - del self.__block_records[header_hash] # remove from blocks - del self.__heights_in_cache[uint32(height)] # remove height from heights in cache + # we need to preserve the recent sub-epoch summary blocks in the + # cache, otherwise we fail to create sub epoch segments + if height not in ses_heights: + for header_hash in blocks_to_remove: + # remove from blocks + del self.__block_records[header_hash] + # remove height from heights in cache + del self.__heights_in_cache[uint32(height)] if height == 0: break diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index f19878007c29..413a8fca0261 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -538,7 +538,8 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t ) if first is None or not isinstance(first, full_node_protocol.RespondBlock): self.sync_store.batch_syncing.remove(peer.peer_node_id) - raise ValueError(f"Error short batch syncing, could not fetch block at height {start_height}") + self.log.error(f"Error short batch syncing, could not fetch block at height {start_height}") + return False if not self.blockchain.contains_block(first.block.prev_header_hash): self.log.info("Batch syncing stopped, this is a deep chain") self.sync_store.batch_syncing.remove(peer.peer_node_id) @@ -913,10 +914,11 @@ async def _sync(self) -> None: self.log.debug("long sync started") try: self.log.info("Starting to perform sync.") - self.log.info("Waiting to receive peaks from peers.") # Wait until we have 3 peaks or up to a max of 30 seconds max_iterations = int(self.config.get("max_sync_wait", 30)) * 10 + + self.log.info(f"Waiting to receive peaks from peers. (timeout: {max_iterations/10}s)") peaks = [] for i in range(max_iterations): peaks = [peak.header_hash for peak in self.sync_store.get_peak_of_each_peer().values()] @@ -1109,6 +1111,9 @@ async def validate_block_batches( self.subscriptions.has_ph_subscription, ) await self.hint_store.add_hints(hints_to_add) + # TODO: this logic is a bit problematic. It assumes that + # end_height is also the peak. But we call this function during + # a reorg as well, where the blocks are not (yet) the main chain self.blockchain.clean_block_record(end_height - self.constants.BLOCKS_CACHE_SIZE) batch_queue_input: asyncio.Queue[Optional[Tuple[WSChiaConnection, List[FullBlock]]]] = asyncio.Queue( diff --git a/chia/simulator/setup_nodes.py b/chia/simulator/setup_nodes.py index 384d3382eba5..1f231b18891c 100644 --- a/chia/simulator/setup_nodes.py +++ b/chia/simulator/setup_nodes.py @@ -77,8 +77,11 @@ async def setup_two_nodes( Setup and teardown of two full nodes, with blockchains and separate DBs. """ + config_overrides = {"full_node.max_sync_wait": 0} with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2: - bt1 = await create_block_tools_async(constants=consensus_constants, keychain=keychain1) + bt1 = await create_block_tools_async( + constants=consensus_constants, keychain=keychain1, config_overrides=config_overrides + ) async with setup_full_node( consensus_constants, "blockchain_test.db", @@ -91,7 +94,9 @@ async def setup_two_nodes( consensus_constants, "blockchain_test_2.db", self_hostname, - await create_block_tools_async(constants=consensus_constants, keychain=keychain2), + await create_block_tools_async( + constants=consensus_constants, keychain=keychain2, config_overrides=config_overrides + ), simulator=False, db_version=db_version, ) as service2: @@ -108,6 +113,7 @@ async def setup_n_nodes( """ Setup and teardown of n full nodes, with blockchains and separate DBs. """ + config_overrides = {"full_node.max_sync_wait": 0} with ExitStack() as stack: keychains = [stack.enter_context(TempKeyring(populate=True)) for _ in range(n)] async with AsyncExitStack() as async_exit_stack: @@ -117,7 +123,9 @@ async def setup_n_nodes( consensus_constants, f"blockchain_test_{i}.db", self_hostname, - await create_block_tools_async(constants=consensus_constants, keychain=keychain), + await create_block_tools_async( + constants=consensus_constants, keychain=keychain, config_overrides=config_overrides + ), simulator=False, db_version=db_version, ) @@ -145,8 +153,6 @@ async def setup_simulators_and_wallets( with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2: if config_overrides is None: config_overrides = {} - if "full_node.max_sync_wait" not in config_overrides: - config_overrides["full_node.max_sync_wait"] = 1 async with setup_simulators_and_wallets_inner( db_version, consensus_constants, @@ -223,6 +229,8 @@ async def setup_simulators_and_wallets_inner( ) -> AsyncIterator[ Tuple[List[BlockTools], List[Service[FullNode, FullNodeSimulator]], List[Service[WalletNode, WalletNodeAPI]]] ]: + if config_overrides is not None and "full_node.max_sync_wait" not in config_overrides: + config_overrides["full_node.max_sync_wait"] = 0 async with AsyncExitStack() as async_exit_stack: bt_tools: List[BlockTools] = [ await create_block_tools_async(consensus_constants, keychain=keychain1, config_overrides=config_overrides) @@ -337,10 +345,15 @@ async def setup_full_system_inner( keychain2: Keychain, shared_b_tools: BlockTools, ) -> AsyncIterator[FullSystem]: + config_overrides = {"full_node.max_sync_wait": 0} if b_tools is None: - b_tools = await create_block_tools_async(constants=consensus_constants, keychain=keychain1) + b_tools = await create_block_tools_async( + constants=consensus_constants, keychain=keychain1, config_overrides=config_overrides + ) if b_tools_1 is None: - b_tools_1 = await create_block_tools_async(constants=consensus_constants, keychain=keychain2) + b_tools_1 = await create_block_tools_async( + constants=consensus_constants, keychain=keychain2, config_overrides=config_overrides + ) self_hostname = shared_b_tools.config["self_hostname"] diff --git a/tests/core/full_node/test_full_node.py b/tests/core/full_node/test_full_node.py index 63aaeb96bc13..089192c94e32 100644 --- a/tests/core/full_node/test_full_node.py +++ b/tests/core/full_node/test_full_node.py @@ -2150,3 +2150,92 @@ async def test_wallet_sync_task_failure( assert "update_wallets - fork_height: 10, peak_height: 0" in caplog.text assert "Wallet sync task failure" not in caplog.text assert not full_node.wallet_sync_task.done() + + +@pytest.mark.asyncio +@pytest.mark.limit_consensus_modes(reason="this test is too slow (for now)") +async def test_long_reorg_nodes( + three_nodes, + default_10000_blocks: List[FullBlock], + test_long_reorg_blocks: List[FullBlock], + self_hostname: str, + seeded_random: random.Random, +): + full_node_1, full_node_2, full_node_3 = three_nodes + + blocks = default_10000_blocks[:1600] + + reorg_blocks = test_long_reorg_blocks[:1200] + + # full node 1 has the original chain + for block_batch in to_batches(blocks, 64): + b = block_batch.entries[0] + if (b.height % 128) == 0: + print(f"main chain: {b.height:4} weight: {b.weight}") + await full_node_1.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None) + + # full node 2 has the reorg-chain + for block_batch in to_batches(reorg_blocks[:-1], 64): + b = block_batch.entries[0] + if (b.height % 128) == 0: + print(f"reorg chain: {b.height:4} weight: {b.weight}") + await full_node_2.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None) + + await connect_and_get_peer(full_node_1.full_node.server, full_node_2.full_node.server, self_hostname) + + # TODO: There appears to be an issue where the node with the lighter chain + # fails to initiate the reorg until there's a new block farmed onto the + # heavier chain. + await full_node_2.full_node.add_block(reorg_blocks[-1]) + + def check_nodes_in_sync(): + try: + p1 = full_node_2.full_node.blockchain.get_peak() + p2 = full_node_1.full_node.blockchain.get_peak() + return p1 == p2 + except Exception as e: + print(f"e: {e}") + return False + + await time_out_assert(120, check_nodes_in_sync) + peak = full_node_2.full_node.blockchain.get_peak() + print(f"peak: {str(peak.header_hash)[:6]}") + + p1 = full_node_1.full_node.blockchain.get_peak() + p2 = full_node_2.full_node.blockchain.get_peak() + + assert p1.header_hash == reorg_blocks[-1].header_hash + assert p2.header_hash == reorg_blocks[-1].header_hash + + blocks = default_10000_blocks[:4000] + + # full node 3 has the original chain, but even longer + for block_batch in to_batches(blocks, 64): + b = block_batch.entries[0] + if (b.height % 128) == 0: + print(f"main chain: {b.height:4} weight: {b.weight}") + await full_node_3.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None) + + print("connecting node 3") + await connect_and_get_peer(full_node_3.full_node.server, full_node_1.full_node.server, self_hostname) + # await connect_and_get_peer(full_node_3.full_node.server, full_node_2.full_node.server, self_hostname) + + def check_nodes_in_sync2(): + try: + p1 = full_node_1.full_node.blockchain.get_peak() + # p2 = full_node_2.full_node.blockchain.get_peak() + p3 = full_node_3.full_node.blockchain.get_peak() + return p1.header_hash == p3.header_hash + except Exception as e: + print(f"e: {e}") + return False + + await time_out_assert(400, check_nodes_in_sync2) + + p1 = full_node_1.full_node.blockchain.get_peak() + # p2 = full_node_2.full_node.blockchain.get_peak() + p3 = full_node_3.full_node.blockchain.get_peak() + + assert p1.header_hash == blocks[-1].header_hash + # assert p2.header_hash == blocks[-1].header_hash + assert p3.header_hash == blocks[-1].header_hash