From 038f5e8fb7ad945a3eb57fd260635146fa92de79 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 15 Nov 2023 17:51:04 +0100 Subject: [PATCH] Fix deep reorgs (#16594) --- .github/workflows/benchmarks.yml | 2 +- .github/workflows/test-single.yml | 2 +- chia/consensus/block_body_validation.py | 159 +++++---- chia/consensus/blockchain.py | 308 ++++++++++++++---- chia/consensus/blockchain_interface.py | 6 + chia/consensus/find_fork_point.py | 96 +++++- chia/consensus/multiprocess_validation.py | 32 +- chia/full_node/block_store.py | 19 ++ chia/full_node/full_node.py | 87 ++++- chia/simulator/block_tools.py | 133 +++++++- chia/util/block_cache.py | 9 + chia/wallet/wallet_blockchain.py | 22 +- tests/blockchain/blockchain_test_utils.py | 34 +- tests/blockchain/config.py | 2 +- tests/blockchain/test_blockchain.py | 171 ++++++++-- tests/blockchain/test_lookup_fork_chain.py | 186 +++++++++++ tests/conftest.py | 41 ++- .../core/full_node/stores/test_block_store.py | 32 ++ .../full_node/stores/test_full_node_store.py | 4 +- tests/core/full_node/test_full_node.py | 117 ++++++- tests/plot_sync/util.py | 3 + tests/util/blockchain.py | 20 +- 22 files changed, 1229 insertions(+), 256 deletions(-) create mode 100644 tests/blockchain/test_lookup_fork_chain.py diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 35876ae0090a..8057e79bb8d0 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -60,7 +60,7 @@ jobs: python-version: [ "3.10" ] env: CHIA_ROOT: ${{ github.workspace }}/.chia/mainnet - BLOCKS_AND_PLOTS_VERSION: 0.33.0 + BLOCKS_AND_PLOTS_VERSION: 0.38.0 steps: - name: Clean workspace diff --git a/.github/workflows/test-single.yml b/.github/workflows/test-single.yml index 38ecb6e75bf1..9d1f0da26072 100644 --- a/.github/workflows/test-single.yml +++ b/.github/workflows/test-single.yml @@ -113,7 +113,7 @@ jobs: CHIA_ROOT: ${{ github.workspace }}/.chia/mainnet CHIA_SIMULATOR_ROOT: ${{ github.workspace }}/.chia/simulator JOB_FILE_NAME: tests_${{ matrix.os.file_name }}_python-${{ matrix.python.file_name }}_${{ matrix.configuration.name }} - BLOCKS_AND_PLOTS_VERSION: 0.33.0 + BLOCKS_AND_PLOTS_VERSION: 0.38.0 steps: - name: Configure git diff --git a/chia/consensus/block_body_validation.py b/chia/consensus/block_body_validation.py index 286645e79d86..9a536a11e663 100644 --- a/chia/consensus/block_body_validation.py +++ b/chia/consensus/block_body_validation.py @@ -2,6 +2,7 @@ import collections import logging +from dataclasses import dataclass, field from typing import Awaitable, Callable, Dict, List, Optional, Set, Tuple, Union from chiabip158 import PyBIP158 @@ -13,10 +14,9 @@ from chia.consensus.coinbase import create_farmer_coin, create_pool_coin from chia.consensus.constants import ConsensusConstants from chia.consensus.cost_calculator import NPCResult -from chia.consensus.find_fork_point import find_fork_point_in_chain from chia.full_node.block_store import BlockStore from chia.full_node.coin_store import CoinStore -from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions, mempool_check_time_locks +from chia.full_node.mempool_check_conditions import mempool_check_time_locks from chia.types.block_protocol import BlockInfo from chia.types.blockchain_format.coin import Coin from chia.types.blockchain_format.sized_bytes import bytes32, bytes48 @@ -27,12 +27,59 @@ from chia.util import cached_bls from chia.util.condition_tools import pkm_pairs from chia.util.errors import Err -from chia.util.generator_tools import tx_removals_and_additions from chia.util.hash import std_hash from chia.util.ints import uint32, uint64 log = logging.getLogger(__name__) +# peak-> o +# main | +# chain o o <- peak_height \ additions and removals +# | | peak_hash | from these blocks are +# o o / recorded +# \ / +# o <- fork_height +# | this block is shared by the main chain +# o and the fork +# : + + +@dataclass +class ForkInfo: + # defines the last block shared by the fork and the main chain. additions + # and removals are from the block following this height up to and including + # the peak_height + fork_height: int + # the ForkInfo object contain all additions and removals made by blocks + # starting at fork_height+1 up to and including peak_height. + # When validating the block at height 0, the peak_height is -1, that's why + # it needs to be signed + peak_height: int + # the header hash of the peak block of this fork + peak_hash: bytes32 + # The additions include coinbase additions + # coin, creation-height, timestamp + additions_since_fork: Dict[bytes32, Tuple[Coin, uint32, uint64]] = field(default_factory=dict) + # coin-id + removals_since_fork: Set[bytes32] = field(default_factory=set) + + def include_spends(self, npc_result: Optional[NPCResult], block: FullBlock) -> None: + height = block.height + if npc_result is not None: + assert npc_result.conds is not None + assert block.foliage_transaction_block is not None + timestamp = block.foliage_transaction_block.timestamp + for spend in npc_result.conds.spends: + self.removals_since_fork.add(bytes32(spend.coin_id)) + for puzzle_hash, amount, _ in spend.create_coin: + coin = Coin(bytes32(spend.coin_id), bytes32(puzzle_hash), uint64(amount)) + self.additions_since_fork[coin.name()] = (coin, height, timestamp) + for coin in block.get_included_reward_coins(): + assert block.foliage_transaction_block is not None + timestamp = block.foliage_transaction_block.timestamp + assert coin.name() not in self.additions_since_fork + self.additions_since_fork[coin.name()] = (coin, block.height, timestamp) + async def validate_block_body( constants: ConsensusConstants, @@ -43,7 +90,7 @@ async def validate_block_body( block: Union[FullBlock, UnfinishedBlock], height: uint32, npc_result: Optional[NPCResult], - fork_point_with_peak: Optional[uint32], + fork_info: ForkInfo, get_block_generator: Callable[[BlockInfo], Awaitable[Optional[BlockGenerator]]], *, validate_signature: bool = True, @@ -55,6 +102,9 @@ async def validate_block_body( only if validation succeeded, and there are transactions. In other cases it returns None. The NPC result is the result of running the generator with the previous generators refs. It is only present for transaction blocks which have spent coins. + fork_info specifies the fork context of this block. In case the block + extends the main chain, it can be empty, but if the block extends a fork + of the main chain, the fork info is mandatory in order to validate the block. """ if isinstance(block, FullBlock): assert height == block.height @@ -72,13 +122,17 @@ async def validate_block_body( ): return Err.NOT_BLOCK_BUT_HAS_DATA, None - prev_tb: BlockRecord = blocks.block_record(block.prev_header_hash) + prev_tb: Optional[BlockRecord] = await blocks.get_block_record_from_db(block.prev_header_hash) + assert prev_tb is not None while not prev_tb.is_transaction_block: - prev_tb = blocks.block_record(prev_tb.prev_hash) + prev_tb = await blocks.get_block_record_from_db(prev_tb.prev_hash) + assert prev_tb is not None assert prev_tb.timestamp is not None if len(block.transactions_generator_ref_list) > 0: return Err.NOT_BLOCK_BUT_HAS_DATA, None + assert fork_info.peak_height == height - 1 + return None, None # This means the block is valid # All checks below this point correspond to transaction blocks @@ -102,7 +156,10 @@ async def validate_block_body( # If height == 0, expected_reward_coins will be left empty if height > 0: # Add reward claims for all blocks from the prev prev block, until the prev block (including the latter) - prev_transaction_block = blocks.block_record(block.foliage_transaction_block.prev_transaction_block_hash) + prev_transaction_block = await blocks.get_block_record_from_db( + block.foliage_transaction_block.prev_transaction_block_hash + ) + assert prev_transaction_block is not None prev_transaction_block_height = prev_transaction_block.height assert prev_transaction_block.timestamp prev_transaction_block_timestamp = prev_transaction_block.timestamp @@ -125,7 +182,8 @@ async def validate_block_body( # For the second block in the chain, don't go back further if prev_transaction_block.height > 0: - curr_b = blocks.block_record(prev_transaction_block.prev_hash) + curr_b = await blocks.get_block_record_from_db(prev_transaction_block.prev_hash) + assert curr_b is not None while not curr_b.is_transaction_block: expected_reward_coins.add( create_pool_coin( @@ -143,7 +201,8 @@ async def validate_block_body( constants.GENESIS_CHALLENGE, ) ) - curr_b = blocks.block_record(curr_b.prev_hash) + curr_b = await blocks.get_block_record_from_db(curr_b.prev_hash) + assert curr_b is not None if set(block.transactions_info.reward_claims_incorporated) != expected_reward_coins: return Err.INVALID_REWARD_COINS, None @@ -279,77 +338,9 @@ async def validate_block_body( # 15. Check if removals exist and were not previously spent. (unspent_db + diff_store + this_block) # The fork point is the last block in common between the peak chain and the chain of `block` - if peak is None or height == 0: - fork_h: int = -1 - elif fork_point_with_peak is not None: - fork_h = fork_point_with_peak - else: - fork_h = find_fork_point_in_chain(blocks, peak, blocks.block_record(block.prev_header_hash)) - # Get additions and removals since (after) fork_h but not including this block - # The values include: the coin that was added, the height of the block in which it was confirmed, and the - # timestamp of the block in which it was confirmed - additions_since_fork: Dict[bytes32, Tuple[Coin, uint32, uint64]] = {} # This includes coinbase additions - removals_since_fork: Set[bytes32] = set() - - # For height 0, there are no additions and removals before this block, so we can skip - if height > 0: - # First, get all the blocks in the fork > fork_h, < block.height - prev_block: Optional[FullBlock] = await block_store.get_full_block(block.prev_header_hash) - reorg_blocks: Dict[uint32, FullBlock] = {} - curr: Optional[FullBlock] = prev_block - assert curr is not None - while curr.height > fork_h: - if curr.height == 0: - break - curr = await block_store.get_full_block(curr.prev_header_hash) - assert curr is not None - reorg_blocks[curr.height] = curr - if fork_h != -1: - assert len(reorg_blocks) == height - fork_h - 1 - - curr = prev_block - assert curr is not None - while curr.height > fork_h: - # Coin store doesn't contain coins from fork, we have to run generator for each block in fork - if curr.transactions_generator is not None: - # These blocks are in the past and therefore assumed to be valid, so get_block_generator won't raise - curr_block_generator: Optional[BlockGenerator] = await get_block_generator(curr) - assert curr_block_generator is not None and curr.transactions_info is not None - curr_npc_result = get_name_puzzle_conditions( - curr_block_generator, - min(constants.MAX_BLOCK_COST_CLVM, curr.transactions_info.cost), - mempool_mode=False, - height=curr.height, - constants=constants, - ) - removals_in_curr, additions_in_curr = tx_removals_and_additions(curr_npc_result.conds) - else: - removals_in_curr = [] - additions_in_curr = [] - - for c_name in removals_in_curr: - assert c_name not in removals_since_fork - removals_since_fork.add(c_name) - for c in additions_in_curr: - coin_name = c.name() - assert coin_name not in additions_since_fork - assert curr.foliage_transaction_block is not None - additions_since_fork[coin_name] = (c, curr.height, curr.foliage_transaction_block.timestamp) - - for coinbase_coin in curr.get_included_reward_coins(): - coin_name = coinbase_coin.name() - assert coin_name not in additions_since_fork - assert curr.foliage_transaction_block is not None - additions_since_fork[coin_name] = ( - coinbase_coin, - curr.height, - curr.foliage_transaction_block.timestamp, - ) - if curr.height == 0: - break - curr = reorg_blocks[uint32(curr.height - 1)] - assert curr is not None + assert fork_info.fork_height < height + assert fork_info.peak_height == height - 1 removal_coin_records: Dict[bytes32, CoinRecord] = {} # the removed coins we need to look up from the DB @@ -370,7 +361,7 @@ async def validate_block_body( else: # This check applies to both coins created before fork (pulled from coin_store), # and coins created after fork (additions_since_fork) - if rem in removals_since_fork: + if rem in fork_info.removals_since_fork: # This coin was spent in the fork return Err.DOUBLE_SPEND_IN_FORK, None removals_from_db.append(rem) @@ -381,10 +372,10 @@ async def validate_block_body( # can't find in the DB, but also coins that were spent after the fork point look_in_fork: List[bytes32] = [] for unspent in unspent_records: - if unspent.confirmed_block_index <= fork_h: + if unspent.confirmed_block_index <= fork_info.fork_height: # Spending something in the current chain, confirmed before fork # (We ignore all coins confirmed after fork) - if unspent.spent == 1 and unspent.spent_block_index <= fork_h: + if unspent.spent == 1 and unspent.spent_block_index <= fork_info.fork_height: # Check for coins spent in an ancestor block return Err.DOUBLE_SPEND, None removal_coin_records[unspent.name] = unspent @@ -402,11 +393,11 @@ async def validate_block_body( for rem in look_in_fork: # This coin is not in the current heaviest chain, so it must be in the fork - if rem not in additions_since_fork: + if rem not in fork_info.additions_since_fork: # Check for spending a coin that does not exist in this fork log.error(f"Err.UNKNOWN_UNSPENT: COIN ID: {rem} NPC RESULT: {npc_result}") return Err.UNKNOWN_UNSPENT, None - new_coin, confirmed_height, confirmed_timestamp = additions_since_fork[rem] + new_coin, confirmed_height, confirmed_timestamp = fork_info.additions_since_fork[rem] new_coin_record: CoinRecord = CoinRecord( new_coin, confirmed_height, diff --git a/chia/consensus/blockchain.py b/chia/consensus/blockchain.py index 7ef0e05f27a3..8ee0850f08be 100644 --- a/chia/consensus/blockchain.py +++ b/chia/consensus/blockchain.py @@ -5,6 +5,7 @@ import enum import logging import multiprocessing +import time import traceback from concurrent.futures import Executor from concurrent.futures.process import ProcessPoolExecutor @@ -13,14 +14,14 @@ from pathlib import Path from typing import Dict, List, Optional, Set, Tuple -from chia.consensus.block_body_validation import validate_block_body +from chia.consensus.block_body_validation import ForkInfo, validate_block_body from chia.consensus.block_header_validation import validate_unfinished_header_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain_interface import BlockchainInterface from chia.consensus.constants import ConsensusConstants from chia.consensus.cost_calculator import NPCResult from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty -from chia.consensus.find_fork_point import find_fork_point_in_chain +from chia.consensus.find_fork_point import lookup_fork_chain from chia.consensus.full_block_to_block_record import block_to_block_record from chia.consensus.multiprocess_validation import ( PreValidationResult, @@ -203,11 +204,74 @@ async def get_full_peak(self) -> Optional[FullBlock]: async def get_full_block(self, header_hash: bytes32) -> Optional[FullBlock]: return await self.block_store.get_full_block(header_hash) + async def advance_fork_info( + self, block: FullBlock, fork_info: ForkInfo, additional_blocks: Dict[bytes32, FullBlock] + ) -> None: + """ + This function is used to advance the peak_height of fork_info given the + full block extending the chain. block is required to be the next block on + top of fork_info.peak_height. If the block is part of the main chain, + the fork_height will set to the same as the peak, making the fork_info + represent an empty fork chain. + If the block is part of a fork, we need to compute the additions and + removals, to update the fork_info object. This is an expensive operation. + """ + + assert fork_info.peak_height <= block.height - 1 + assert fork_info.peak_hash != block.header_hash + + if fork_info.peak_hash == block.prev_header_hash: + assert fork_info.peak_height == block.height - 1 + return + + # note that we're not technically finding a fork here, we just traverse + # from the current block down to the fork's current peak + chain, peak_hash = await lookup_fork_chain( + self, + (uint32(fork_info.peak_height), fork_info.peak_hash), + (uint32(block.height - 1), block.prev_header_hash), + ) + # the ForkInfo object is expected to be valid, just having its peak + # behind the current block + assert peak_hash == fork_info.peak_hash + assert len(chain) == block.height - fork_info.peak_height - 1 + + for height in range(fork_info.peak_height + 1, block.height): + fork_block: Optional[FullBlock] = await self.block_store.get_full_block(chain[uint32(height)]) + assert fork_block is not None + await self.run_single_block(fork_block, fork_info, additional_blocks) + + async def run_single_block( + self, block: FullBlock, fork_info: ForkInfo, additional_blocks: Dict[bytes32, FullBlock] + ) -> None: + assert fork_info.peak_height == block.height - 1 + assert block.height == 0 or fork_info.peak_hash == block.prev_header_hash + + npc: Optional[NPCResult] = None + if block.transactions_generator is not None: + block_generator: Optional[BlockGenerator] = await self.get_block_generator(block, additional_blocks) + assert block_generator is not None + assert block.transactions_info is not None + assert block.foliage_transaction_block is not None + npc = get_name_puzzle_conditions( + block_generator, + block.transactions_info.cost, + mempool_mode=False, + height=block.height, + constants=self.constants, + ) + assert npc.error is None + + fork_info.include_spends(npc, block) + + fork_info.peak_height = block.height + fork_info.peak_hash = block.header_hash + async def add_block( self, block: FullBlock, pre_validation_result: PreValidationResult, - fork_point_with_peak: Optional[uint32] = None, + fork_info: Optional[ForkInfo] = None, ) -> Tuple[AddBlockResult, Optional[Err], Optional[StateChangeSummary]]: """ This method must be called under the blockchain lock @@ -219,7 +283,8 @@ async def add_block( Args: block: The FullBlock to be validated. pre_validation_result: A result of successful pre validation - fork_point_with_peak: The fork point, for efficiency reasons, if None, it will be recomputed + fork_info: Information about the fork chain this block is part of, + to make validation more efficient. This is an in-out parameter. Returns: The result of adding the block to the blockchain (NEW_PEAK, ADDED_AS_ORPHAN, INVALID_BLOCK, @@ -231,15 +296,25 @@ async def add_block( - A list of NPCResult for any new transaction block added to the chain """ + if block.height == 0 and block.prev_header_hash != self.constants.GENESIS_CHALLENGE: + return AddBlockResult.INVALID_BLOCK, Err.INVALID_PREV_BLOCK_HASH, None + + peak = self.get_peak() genesis: bool = block.height == 0 - if self.contains_block(block.header_hash): - return AddBlockResult.ALREADY_HAVE_BLOCK, None, None + extending_main_chain: bool = genesis or peak is None or (block.prev_header_hash == peak.header_hash) - if not self.contains_block(block.prev_header_hash) and not genesis: - return AddBlockResult.DISCONNECTED_BLOCK, Err.INVALID_PREV_BLOCK_HASH, None + # first check if this block is disconnected from the currently known + # blocks. We can only accept blocks that are connected to another block + # we know of. + prev_block: Optional[BlockRecord] = None + if not extending_main_chain: + prev_block = await self.get_block_record_from_db(block.prev_header_hash) + if not genesis: + if prev_block is None: + return AddBlockResult.DISCONNECTED_BLOCK, Err.INVALID_PREV_BLOCK_HASH, None - if not genesis and (self.block_record(block.prev_header_hash).height + 1) != block.height: - return AddBlockResult.INVALID_BLOCK, Err.INVALID_HEIGHT, None + if prev_block.height + 1 != block.height: + return AddBlockResult.INVALID_BLOCK, Err.INVALID_HEIGHT, None npc_result: Optional[NPCResult] = pre_validation_result.npc_result required_iters = pre_validation_result.required_iters @@ -247,6 +322,93 @@ async def add_block( return AddBlockResult.INVALID_BLOCK, Err(pre_validation_result.error), None assert required_iters is not None + header_hash: bytes32 = block.header_hash + + # maybe fork_info should be mandatory to pass in, but we have a lot of + # tests that make sure the Blockchain object can handle any blocks, + # including orphaned ones, without any fork context + if fork_info is None: + # remember that this fork_info object is temporary and won't be + # returned to the caller. It means we can save time by not updating + # it once the block validates + temporary_fork_info = True + + if await self.contains_block_from_db(header_hash): + # this means we have already seen and validated this block. + return AddBlockResult.ALREADY_HAVE_BLOCK, None, None + elif extending_main_chain: + # this is the common and efficient case where we extend the main + # chain. The fork_info can be empty + prev_height = block.height - 1 + fork_info = ForkInfo(prev_height, prev_height, block.prev_header_hash) + else: + assert peak is not None + # the block is extending a fork, and we don't have any fork_info + # for it. This can potentially be quite expensive and we should + # try to avoid getting here + + # first, collect all the block hashes of the forked chain + # the block we're trying to add doesn't exist in the chain yet, + # so we need to start traversing from its prev_header_hash + fork_chain, fork_hash = await lookup_fork_chain( + self, (peak.height, peak.header_hash), (uint32(block.height - 1), block.prev_header_hash) + ) + # now we know how long the fork is, and can compute the fork + # height. + fork_height = block.height - len(fork_chain) - 1 + fork_info = ForkInfo(fork_height, fork_height, fork_hash) + + log.warning( + f"slow path in block validation. Building coin set for fork ({fork_height}, {block.height})" + ) + + # now run all the blocks of the fork to compute the additions + # and removals. They are recorded in the fork_info object + counter = 0 + start = time.monotonic() + for height in range(fork_info.fork_height + 1, block.height): + fork_block: Optional[FullBlock] = await self.block_store.get_full_block(fork_chain[uint32(height)]) + assert fork_block is not None + assert fork_block.height - 1 == fork_info.peak_height + assert fork_block.height == 0 or fork_block.prev_header_hash == fork_info.peak_hash + await self.run_single_block(fork_block, fork_info, {}) + counter += 1 + end = time.monotonic() + log.info( + f"executed {counter} block generators in {end - start:2f} s. " + f"{len(fork_info.additions_since_fork)} additions, " + f"{len(fork_info.removals_since_fork)} removals" + ) + + else: + temporary_fork_info = False + if extending_main_chain: + fork_info.fork_height = block.height - 1 + fork_info.peak_height = block.height - 1 + fork_info.peak_hash = block.prev_header_hash + fork_info.additions_since_fork == {} + fork_info.removals_since_fork == set() + + if await self.contains_block_from_db(header_hash): + # We have already validated the block, but if it's not part of the + # main chain, we still need to re-run it to update the additions and + # removals in fork_info. + await self.advance_fork_info(block, fork_info, {}) + fork_info.include_spends(npc_result, block) + fork_info.peak_height = int(block.height) + fork_info.peak_hash = header_hash + + return AddBlockResult.ALREADY_HAVE_BLOCK, None, None + + if fork_info.peak_hash != block.prev_header_hash: + await self.advance_fork_info(block, fork_info, {}) + + # if these prerequisites of the fork_info aren't met, the fork_info + # object is invalid for this block. If the caller would have passed in + # None, a valid fork_info would have been computed + assert fork_info.peak_height == block.height - 1 + assert block.height == 0 or fork_info.peak_hash == block.prev_header_hash + error_code, _ = await validate_block_body( self.constants, self, @@ -256,7 +418,7 @@ async def add_block( block, block.height, npc_result, - fork_point_with_peak, + fork_info, self.get_block_generator, # If we did not already validate the signature, validate it now validate_signature=not pre_validation_result.validated_signature, @@ -264,6 +426,19 @@ async def add_block( if error_code is not None: return AddBlockResult.INVALID_BLOCK, error_code, None + # commit the additions and removals from this block into the ForkInfo, in + # case we're validating blocks on a fork, the next block validation will + # need to know of these additions and removals + if not temporary_fork_info: + assert fork_info.peak_height == block.height - 1 + fork_info.include_spends(npc_result, block) + fork_info.peak_height = int(block.height) + fork_info.peak_hash = header_hash + + # block_to_block_record() require the previous block in the cache + if not genesis and prev_block is not None: + self.add_block_record(prev_block) + block_record = block_to_block_record( self.constants, self, @@ -279,11 +454,10 @@ async def add_block( try: # Always add the block to the database async with self.block_store.db_wrapper.writer(): - header_hash: bytes32 = block.header_hash # Perform the DB operations to update the state, and rollback if something goes wrong await self.block_store.add_full_block(header_hash, block, block_record) records, state_change_summary = await self._reconsider_peak( - block_record, genesis, fork_point_with_peak, npc_result + block_record, genesis, fork_info, npc_result ) # Then update the memory cache. It is important that this is not cancelled and does not throw @@ -311,7 +485,7 @@ async def add_block( self.block_store.rollback_cache_block(header_hash) self._peak_height = previous_peak_height log.error( - f"Error while adding block {block.header_hash} height {block.height}," + f"Error while adding block {header_hash} height {block.height}," f" rolling back: {traceback.format_exc()} {e}" ) raise @@ -331,7 +505,7 @@ async def _reconsider_peak( self, block_record: BlockRecord, genesis: bool, - fork_point_with_peak: Optional[uint32], + fork_info: ForkInfo, npc_result: Optional[NPCResult], ) -> Tuple[List[BlockRecord], Optional[StateChangeSummary]]: """ @@ -374,17 +548,8 @@ async def _reconsider_peak( # This is not a heavier block than the heaviest we have seen, so we don't change the coin set return [], None - # Finds the fork. if the block is just being appended, it will return the peak - # If no blocks in common, returns -1, and reverts all blocks - if block_record.prev_hash == peak.header_hash: - fork_height: int = peak.height - elif fork_point_with_peak is not None: - fork_height = fork_point_with_peak - else: - fork_height = find_fork_point_in_chain(self, block_record, peak) - if block_record.prev_hash != peak.header_hash: - for coin_record in await self.coin_store.rollback_to_block(fork_height): + for coin_record in await self.coin_store.rollback_to_block(fork_info.fork_height): rolled_back_state[coin_record.name] = coin_record # Collects all blocks from fork point to new peak @@ -392,7 +557,7 @@ async def _reconsider_peak( curr = block_record.header_hash # Backtracks up to the fork point, pulling all the required blocks from DB (that will soon be in the chain) - while fork_height < 0 or curr != self.height_to_hash(uint32(fork_height)): + while fork_info.fork_height < 0 or curr != self.height_to_hash(uint32(fork_info.fork_height)): fetched_full_block: Optional[FullBlock] = await self.block_store.get_full_block(curr) fetched_block_record: Optional[BlockRecord] = await self.block_store.get_block_record(curr) assert fetched_full_block is not None @@ -438,14 +603,18 @@ async def _reconsider_peak( # we made it to the end successfully # Rollback sub_epoch_summaries - await self.block_store.rollback(fork_height) + await self.block_store.rollback(fork_info.fork_height) await self.block_store.set_in_chain([(br.header_hash,) for br in records_to_add]) # Changes the peak to be the new peak await self.block_store.set_peak(block_record.header_hash) return records_to_add, StateChangeSummary( - block_record, uint32(max(fork_height, 0)), list(rolled_back_state.values()), npc_results, reward_coins + block_record, + uint32(max(fork_info.fork_height, 0)), + list(rolled_back_state.values()), + npc_results, + reward_coins, ) async def get_tx_removals_and_additions( @@ -631,6 +800,8 @@ async def validate_unfinished_block( else self.block_record(block.prev_header_hash).height ) + fork_info = ForkInfo(prev_height, prev_height, block.prev_header_hash) + error_code, cost_result = await validate_block_body( self.constants, self, @@ -640,7 +811,7 @@ async def validate_unfinished_block( block, uint32(prev_height + 1), npc_result, - None, + fork_info, self.get_block_generator, validate_signature=False, # Signature was already validated before calling this method, no need to validate ) @@ -848,10 +1019,32 @@ async def get_block_records_at(self, heights: List[uint32], batch_size: int = 90 return records async def get_block_record_from_db(self, header_hash: bytes32) -> Optional[BlockRecord]: - if header_hash in self.__block_records: - return self.__block_records[header_hash] + ret = self.__block_records.get(header_hash) + if ret is not None: + return ret return await self.block_store.get_block_record(header_hash) + async def prev_block_hash(self, header_hashes: List[bytes32]) -> List[bytes32]: + """ + Given a list of block header hashes, returns the previous header hashes + for each block, in the order they were passed in. + """ + ret = [] + for h in header_hashes: + b = self.__block_records.get(h) + if b is not None: + ret.append(b.prev_hash) + else: + ret.append(await self.block_store.get_prev_hash(h)) + return ret + + async def contains_block_from_db(self, header_hash: bytes32) -> bool: + ret = header_hash in self.__block_records + if ret: + return True + + return (await self.block_store.get_block_record(header_hash)) is not None + def remove_block_record(self, header_hash: bytes32) -> None: sbr = self.block_record(header_hash) del self.__block_records[header_hash] @@ -919,55 +1112,38 @@ async def get_block_generator( result = await self.block_store.get_generators_at(block.transactions_generator_ref_list) else: # First tries to find the blocks in additional_blocks - reorg_chain: Dict[uint32, FullBlock] = {} curr = block additional_height_dict = {} while curr.prev_header_hash in additional_blocks: prev: FullBlock = additional_blocks[curr.prev_header_hash] additional_height_dict[prev.height] = prev - if isinstance(curr, FullBlock): - assert curr.height == prev.height + 1 - reorg_chain[prev.height] = prev curr = prev peak: Optional[BlockRecord] = self.get_peak() - if self.contains_block(curr.prev_header_hash) and peak is not None: + prev_block_record = await self.get_block_record_from_db(curr.prev_header_hash) + reorg_chain: Dict[uint32, bytes32] = {} + if prev_block_record is not None and peak is not None: # Then we look up blocks up to fork point one at a time, backtracking - previous_block_hash = curr.prev_header_hash - prev_block_record = await self.block_store.get_block_record(previous_block_hash) - prev_block = await self.block_store.get_full_block(previous_block_hash) - assert prev_block is not None - assert prev_block_record is not None - fork = find_fork_point_in_chain(self, peak, prev_block_record) - curr_2: Optional[FullBlock] = prev_block - assert curr_2 is not None and isinstance(curr_2, FullBlock) - reorg_chain[curr_2.height] = curr_2 - while curr_2.height > fork and curr_2.height > 0: - curr_2 = await self.block_store.get_full_block(curr_2.prev_header_hash) - assert curr_2 is not None - reorg_chain[curr_2.height] = curr_2 + height_to_hash, _ = await lookup_fork_chain( + self, + (peak.height, peak.header_hash), + (prev_block_record.height, prev_block_record.header_hash), + ) + reorg_chain.update(height_to_hash) for ref_height in block.transactions_generator_ref_list: - if ref_height in reorg_chain: - ref_block = reorg_chain[ref_height] - assert ref_block is not None + if ref_height in additional_height_dict: + ref_block = additional_height_dict[ref_height] if ref_block.transactions_generator is None: raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR) result.append(ref_block.transactions_generator) + elif ref_height in reorg_chain: + gen = await self.block_store.get_generator(reorg_chain[ref_height]) + if gen is None: + raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR) + result.append(gen) else: - if ref_height in additional_height_dict: - ref_block = additional_height_dict[ref_height] - assert ref_block is not None - if ref_block.transactions_generator is None: - raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR) - result.append(ref_block.transactions_generator) - else: - header_hash = self.height_to_hash(ref_height) - if header_hash is None: - raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR) - gen = await self.block_store.get_generator(header_hash) - if gen is None: - raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR) - result.append(gen) + [gen] = await self.block_store.get_generators_at([ref_height]) + result.append(gen) assert len(result) == len(ref_list) return BlockGenerator(block.transactions_generator, result, []) diff --git a/chia/consensus/blockchain_interface.py b/chia/consensus/blockchain_interface.py index bb948f6e1ce3..cf0eb7b55e7c 100644 --- a/chia/consensus/blockchain_interface.py +++ b/chia/consensus/blockchain_interface.py @@ -41,6 +41,9 @@ def contains_block(self, header_hash: bytes32) -> bool: # ignoring hinting error until we handle our interfaces more formally return # type: ignore[return-value] + async def contains_block_from_db(self, header_hash: bytes32) -> bool: + return # type: ignore[return-value] + def remove_block_record(self, header_hash: bytes32) -> None: pass @@ -61,6 +64,9 @@ async def get_block_records_in_range(self, start: int, stop: int) -> Dict[bytes3 # ignoring hinting error until we handle our interfaces more formally return # type: ignore[return-value] + async def prev_block_hash(self, header_hashes: List[bytes32]) -> List[bytes32]: + return # type: ignore[return-value] + async def get_header_blocks_in_range( self, start: int, stop: int, tx_filter: bool = True ) -> Dict[bytes32, HeaderBlock]: diff --git a/chia/consensus/find_fork_point.py b/chia/consensus/find_fork_point.py index 1dc233c15ad2..15ba6f0028fb 100644 --- a/chia/consensus/find_fork_point.py +++ b/chia/consensus/find_fork_point.py @@ -1,13 +1,15 @@ from __future__ import annotations -from typing import Union +from typing import Dict, Tuple, Union from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain_interface import BlockchainInterface +from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.header_block import HeaderBlock +from chia.util.ints import uint32 -def find_fork_point_in_chain( +async def find_fork_point_in_chain( blocks: BlockchainInterface, block_1: Union[BlockRecord, HeaderBlock], block_2: Union[BlockRecord, HeaderBlock], @@ -17,19 +19,87 @@ def find_fork_point_in_chain( Returns -1 if chains have no common ancestor * assumes the fork point is loaded in blocks """ - while block_2.height > 0 or block_1.height > 0: - if block_2.height > block_1.height: - block_2 = blocks.block_record(block_2.prev_hash) - elif block_1.height > block_2.height: - block_1 = blocks.block_record(block_1.prev_hash) - else: - if block_2.header_hash == block_1.header_hash: - return block_2.height - block_2 = blocks.block_record(block_2.prev_hash) - block_1 = blocks.block_record(block_1.prev_hash) - if block_2 != block_1: + height_1 = int(block_1.height) + height_2 = int(block_2.height) + bh_1 = block_1.header_hash + bh_2 = block_2.header_hash + + # special case for first level, since we actually already know the previous + # hash + if height_1 > height_2: + bh_1 = block_1.prev_hash + height_1 -= 1 + elif height_2 > height_1: + bh_2 = block_2.prev_hash + height_2 -= 1 + + while height_1 > height_2: + [bh_1] = await blocks.prev_block_hash([bh_1]) + height_1 -= 1 + + while height_2 > height_1: + [bh_2] = await blocks.prev_block_hash([bh_2]) + height_2 -= 1 + + assert height_1 == height_2 + + height = height_2 + while height > 0: + if bh_1 == bh_2: + return height + [bh_1, bh_2] = await blocks.prev_block_hash([bh_1, bh_2]) + height -= 1 + + if bh_2 != bh_1: # All blocks are different return -1 # First block is the same return 0 + + +async def lookup_fork_chain( + blocks: BlockchainInterface, + block_1: Tuple[uint32, bytes32], + block_2: Tuple[uint32, bytes32], +) -> Tuple[Dict[uint32, bytes32], bytes32]: + """Tries to find height where new chain (block_2) diverged from block_1. + The inputs are (height, header-hash)-tuples. + Returns two values: + 1. The height to hash map of block_2's chain down to, but not + including, the fork height + 2. The header hash of the block at the fork height + """ + height_1 = int(block_1[0]) + bh_1 = block_1[1] + height_2 = int(block_2[0]) + bh_2 = block_2[1] + + ret: Dict[uint32, bytes32] = {} + + while height_1 > height_2: + [bh_1] = await blocks.prev_block_hash([bh_1]) + height_1 -= 1 + + while height_2 > height_1: + ret[uint32(height_2)] = bh_2 + [bh_2] = await blocks.prev_block_hash([bh_2]) + height_2 -= 1 + + assert height_1 == height_2 + + height = height_2 + while height > 0: + if bh_1 == bh_2: + return (ret, bh_2) + ret[uint32(height)] = bh_2 + [bh_1, bh_2] = await blocks.prev_block_hash([bh_1, bh_2]) + height -= 1 + + if bh_1 == bh_2: + return (ret, bh_2) + + ret[uint32(0)] = bh_2 + # TODO: if we would pass in the consensus constants, we could return the + # GENESIS_CHALLENGE hash here, instead of zeros + return (ret, bytes32([0] * 32)) diff --git a/chia/consensus/multiprocess_validation.py b/chia/consensus/multiprocess_validation.py index 98d95fe42d8f..1aacfaa4f245 100644 --- a/chia/consensus/multiprocess_validation.py +++ b/chia/consensus/multiprocess_validation.py @@ -197,9 +197,9 @@ async def pre_validate_blocks_multiprocessing( num_sub_slots_found = 0 num_blocks_seen = 0 if blocks[0].height > 0: - if not block_records.contains_block(blocks[0].prev_header_hash): + curr = await block_records.get_block_record_from_db(blocks[0].prev_header_hash) + if curr is None: return [PreValidationResult(uint16(Err.INVALID_PREV_BLOCK_HASH.value), None, None, False)] - curr = block_records.block_record(blocks[0].prev_header_hash) num_sub_slots_to_look_for = 3 if curr.overflow else 2 header_hash = curr.header_hash while ( @@ -214,7 +214,8 @@ async def pre_validate_blocks_multiprocessing( if curr.is_transaction_block: num_blocks_seen += 1 header_hash = curr.prev_hash - curr = block_records.block_record(header_hash) + curr = await block_records.get_block_record_from_db(curr.prev_hash) + assert curr is not None recent_blocks[header_hash] = curr block_record_was_present = [] @@ -227,9 +228,30 @@ async def pre_validate_blocks_multiprocessing( diff_ssis: List[Tuple[uint64, uint64]] = [] for block in blocks: if block.height != 0: - assert block_records.contains_block(block.prev_header_hash) if prev_b is None: - prev_b = block_records.block_record(block.prev_header_hash) + prev_b = await block_records.get_block_record_from_db(block.prev_header_hash) + assert prev_b is not None + + # the call to block_to_block_record() requires the previous + # block is in the cache + # and make_sub_epoch_summary() requires all blocks until we find one + # that includes a sub_epoch_summary + curr = prev_b + block_records.add_block_record(curr) + counter = 0 + # TODO: It would probably be better to make + # get_next_sub_slot_iters_and_difficulty() async and able to pull + # from the database rather than trying to predict which blocks it + # may need in the cache + while ( + curr.sub_epoch_summary_included is None + or counter < 3 * constants.MAX_SUB_SLOT_BLOCKS + constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK + 3 + ): + curr = await block_records.get_block_record_from_db(curr.prev_hash) + if curr is None: + break + block_records.add_block_record(curr) + counter += 1 sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty( constants, len(block.finished_sub_slots) > 0, prev_b, block_records diff --git a/chia/full_node/block_store.py b/chia/full_node/block_store.py index 31af6da68f2f..2badae0160c6 100644 --- a/chia/full_node/block_store.py +++ b/chia/full_node/block_store.py @@ -342,6 +342,25 @@ async def get_block_records_by_hash(self, header_hashes: List[bytes32]) -> List[ ret.append(all_blocks[hh]) return ret + async def get_prev_hash(self, header_hash: bytes32) -> bytes32: + """ + Returns the header hash preceeding the input header hash. + Throws an exception if the block is not present + """ + cached = self.block_cache.get(header_hash) + if cached is not None: + return cached.prev_header_hash + + async with self.db_wrapper.reader_no_transaction() as conn: + async with conn.execute( + "SELECT prev_hash FROM full_blocks WHERE header_hash=?", + (header_hash,), + ) as cursor: + row = await cursor.fetchone() + if row is None: + raise KeyError("missing block in chain") + return bytes32(row[0]) + async def get_block_bytes_by_hash(self, header_hashes: List[bytes32]) -> List[bytes]: """ Returns a list of Full Blocks block blobs, ordered by the same order in which header_hashes are passed in. diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 79e180be8d7d..e165e743fe29 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -29,6 +29,7 @@ from chia_rs import AugSchemeMPL +from chia.consensus.block_body_validation import ForkInfo from chia.consensus.block_creation import unfinished_block_to_full_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain import AddBlockResult, Blockchain, BlockchainMutexPriority, StateChangeSummary @@ -1051,6 +1052,10 @@ async def sync_from_fork_point( ) batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS + # normally "fork_point" or "fork_height" refers to the first common + # block between the main chain and the fork. Here "fork_point_height" + # seems to refer to the first diverging block + async def fetch_block_batches( batch_queue: asyncio.Queue[Optional[Tuple[WSChiaConnection, List[FullBlock]]]] ) -> None: @@ -1089,7 +1094,8 @@ async def fetch_block_batches( async def validate_block_batches( inner_batch_queue: asyncio.Queue[Optional[Tuple[WSChiaConnection, List[FullBlock]]]] ) -> None: - advanced_peak: bool = False + fork_info: Optional[ForkInfo] = None + while True: res: Optional[Tuple[WSChiaConnection, List[FullBlock]]] = await inner_batch_queue.get() if res is None: @@ -1098,8 +1104,34 @@ async def validate_block_batches( peer, blocks = res start_height = blocks[0].height end_height = blocks[-1].height + + # in case we're validating a reorg fork (i.e. not extending the + # main chain), we need to record the coin set from that fork in + # fork_info. Otherwise validation is very expensive, especially + # for deep reorgs + peak: Optional[BlockRecord] + if fork_info is None: + peak = self.blockchain.get_peak() + extending_main_chain: bool = peak is None or ( + peak.header_hash == blocks[0].prev_header_hash or peak.header_hash == blocks[0].header_hash + ) + # if we're simply extending the main chain, it's important + # *not* to pass in a ForkInfo object, as it can potentially + # accrue a large state (with no value, since we can validate + # against the CoinStore) + if not extending_main_chain: + if fork_point_height == 0: + fork_info = ForkInfo(-1, -1, bytes32([0] * 32)) + else: + fork_hash = self.blockchain.height_to_hash(uint32(fork_point_height - 1)) + assert fork_hash is not None + fork_info = ForkInfo(fork_point_height - 1, fork_point_height - 1, fork_hash) + success, state_change_summary, err = await self.add_block_batch( - blocks, peer.get_peer_logging(), None if advanced_peak else uint32(fork_point_height), summaries + blocks, + peer.get_peer_logging(), + fork_info, + summaries, ) if success is False: await peer.close(600) @@ -1109,9 +1141,8 @@ async def validate_block_batches( raise ValidationError(err, f"Failed to validate block batch {start_height} to {end_height}") raise ValueError(f"Failed to validate block batch {start_height} to {end_height}") self.log.info(f"Added blocks {start_height} to {end_height}") - peak: Optional[BlockRecord] = self.blockchain.get_peak() + peak = self.blockchain.get_peak() if state_change_summary is not None: - advanced_peak = True assert peak is not None # Hints must be added to the DB. The other post-processing tasks are not required when syncing hints_to_add, _ = get_hints_and_subscription_coin_ids( @@ -1197,17 +1228,50 @@ async def add_block_batch( self, all_blocks: List[FullBlock], peer_info: PeerInfo, - fork_point: Optional[uint32], + fork_info: Optional[ForkInfo], wp_summaries: Optional[List[SubEpochSummary]] = None, ) -> Tuple[bool, Optional[StateChangeSummary], Optional[Err]]: # Precondition: All blocks must be contiguous blocks, index i+1 must be the parent of index i # Returns a bool for success, as well as a StateChangeSummary if the peak was advanced + block_dict: Dict[bytes32, FullBlock] = {} + for block in all_blocks: + block_dict[block.header_hash] = block + blocks_to_validate: List[FullBlock] = [] for i, block in enumerate(all_blocks): - if not self.blockchain.contains_block(block.header_hash): + header_hash = block.header_hash + if not await self.blockchain.contains_block_from_db(header_hash): blocks_to_validate = all_blocks[i:] break + + if fork_info is None: + continue + # the below section updates the fork_info object, if + # there is one. + + # TODO: it seems unnecessary to request overlapping block ranges + # when syncing + if block.height <= fork_info.peak_height: + continue + + # we have already validated this block once, no need to do it again. + # however, if this block is not part of the main chain, we need to + # update the fork context with its additions and removals + if self.blockchain.height_to_hash(block.height) == header_hash: + # we're on the main chain, just fast-forward the fork height + fork_info.fork_height = block.height + fork_info.peak_height = block.height + fork_info.peak_hash = header_hash + fork_info.additions_since_fork == {} + fork_info.removals_since_fork == set() + else: + # We have already validated the block, but if it's not part of the + # main chain, we still need to re-run it to update the additions and + # removals in fork_info. + await self.blockchain.advance_fork_info(block, fork_info, block_dict) + await self.blockchain.run_single_block(block, fork_info, block_dict) + if len(blocks_to_validate) == 0: return True, None, None @@ -1235,9 +1299,8 @@ async def add_block_batch( for i, block in enumerate(blocks_to_validate): assert pre_validation_results[i].required_iters is not None state_change_summary: Optional[StateChangeSummary] - advanced_peak = agg_state_change_summary is not None result, error, state_change_summary = await self.blockchain.add_block( - block, pre_validation_results[i], None if advanced_peak else fork_point + block, pre_validation_results[i], fork_info ) if result == AddBlockResult.NEW_PEAK: @@ -1259,7 +1322,8 @@ async def add_block_batch( if error is not None: self.log.error(f"Error: {error}, Invalid block from peer: {peer_info} ") return False, agg_state_change_summary, error - block_record = self.blockchain.block_record(block.header_hash) + block_record = await self.blockchain.get_block_record_from_db(block.header_hash) + assert block_record is not None if block_record.sub_epoch_summary_included is not None: if self.weight_proof_handler is not None: await self.weight_proof_handler.create_prev_sub_epoch_segments() @@ -1428,7 +1492,7 @@ async def peak_post_processing( # This is a reorg fork_hash: Optional[bytes32] = self.blockchain.height_to_hash(state_change_summary.fork_height) assert fork_hash is not None - fork_block = self.blockchain.block_record(fork_hash) + fork_block = await self.blockchain.get_block_record_from_db(fork_hash) fns_peak_result: FullNodeStorePeakResult = self.full_node_store.new_peak( record, @@ -1579,6 +1643,7 @@ async def add_block( block: FullBlock, peer: Optional[WSChiaConnection] = None, raise_on_disconnected: bool = False, + fork_info: Optional[ForkInfo] = None, ) -> Optional[Message]: """ Add a full block from a peer full node (or ourselves). @@ -1681,7 +1746,7 @@ async def add_block( ) assert result_to_validate.required_iters == pre_validation_results[0].required_iters (added, error_code, state_change_summary) = await self.blockchain.add_block( - block, result_to_validate, None + block, result_to_validate, fork_info ) if added == AddBlockResult.ALREADY_HAVE_BLOCK: return None diff --git a/chia/simulator/block_tools.py b/chia/simulator/block_tools.py index 5555b7baebc4..e4ceeb8fc4bc 100644 --- a/chia/simulator/block_tools.py +++ b/chia/simulator/block_tools.py @@ -13,6 +13,7 @@ import time from dataclasses import dataclass, replace from pathlib import Path +from random import Random from typing import Any, Callable, Dict, List, Optional, Tuple, Union from chia_rs import ( @@ -23,6 +24,7 @@ G2Element, PrivateKey, compute_merkle_set_root, + solution_generator, ) from chiabip158 import PyBIP158 from clvm.casts import int_from_bytes @@ -186,6 +188,27 @@ def compute_additions_unchecked(sb: SpendBundle) -> List[Coin]: return ret +def make_spend_bundle(coins: List[Coin], wallet: WalletTool, rng: Random) -> Tuple[SpendBundle, List[Coin]]: + """ + makes a new spend bundle (block generator) spending some of the coins in the + list of coins. The list will be updated to have spent coins removed and new + coins appended. + """ + new_coins: List[Coin] = [] + spend_bundles: List[SpendBundle] = [] + to_spend = rng.sample(coins, min(5, len(coins))) + receiver = wallet.get_new_puzzlehash() + for c in to_spend: + bundle = wallet.generate_signed_transaction(uint64(c.amount // 2), receiver, c) + new_coins.extend(bundle.additions()) + spend_bundles.append(bundle) + coins.remove(c) + + coins.extend(new_coins) + + return SpendBundle.aggregate(spend_bundles), new_coins + + class BlockTools: """ Tools to generate blocks for testing. @@ -583,6 +606,8 @@ def get_consecutive_blocks( previous_generator: Optional[Union[CompressorArg, List[uint32]]] = None, genesis_timestamp: Optional[uint64] = None, force_plot_id: Optional[bytes32] = None, + dummy_block_references: bool = False, + include_transactions: bool = False, ) -> List[FullBlock]: assert num_blocks > 0 if block_list_input is not None: @@ -590,11 +615,42 @@ def get_consecutive_blocks( else: block_list = [] + tx_block_heights: List[uint32] = [] + if dummy_block_references: + # block references can only point to transaction blocks, so we need + # to record which ones are + for b in block_list: + if b.transactions_generator is not None: + tx_block_heights.append(b.height) + constants = self.constants transaction_data_included = False if time_per_block is None: time_per_block = float(constants.SUB_SLOT_TIME_TARGET) / float(constants.SLOT_BLOCKS_TARGET) + available_coins: List[Coin] = [] + pending_rewards: List[Coin] = [] + wallet: Optional[WalletTool] = None + rng: Optional[Random] = None + if include_transactions: + # when we generate transactions in the chain, the caller cannot also + # have ownership of the rewards and control the transactions + assert farmer_reward_puzzle_hash is None + assert pool_reward_puzzle_hash is None + assert transaction_data is None + + for b in block_list: + for coin in b.get_included_reward_coins(): + if coin.puzzle_hash == self.farmer_ph: + available_coins.append(coin) + print( + f"found {len(available_coins)} reward coins in existing chain." + "for simplicity, we assume the rewards are all unspent in the original chain" + ) + wallet = self.get_farmer_wallet_tool() + rng = Random() + rng.seed(seed) + if farmer_reward_puzzle_hash is None: farmer_reward_puzzle_hash = self.farmer_ph @@ -727,6 +783,13 @@ def get_consecutive_blocks( if transaction_data is not None: additions = compute_additions_unchecked(transaction_data) removals = transaction_data.removals() + elif include_transactions: + assert wallet is not None + assert rng is not None + transaction_data, additions = make_spend_bundle(available_coins, wallet, rng) + removals = transaction_data.removals() + transaction_data_included = False + assert last_timestamp is not None if proof_of_space.pool_contract_puzzle_hash is not None: if pool_reward_puzzle_hash is not None: @@ -762,6 +825,23 @@ def get_consecutive_blocks( block_generator = None aggregate_signature = G2Element() + if dummy_block_references: + if block_generator is None: + program = SerializedProgram.from_bytes(solution_generator([])) + block_generator = BlockGenerator(program, [], []) + + if len(tx_block_heights) > 4: + block_refs = [ + tx_block_heights[1], + tx_block_heights[len(tx_block_heights) // 2], + tx_block_heights[-2], + ] + else: + block_refs = [] + block_generator = dataclasses.replace( + block_generator, block_height_list=block_generator.block_height_list + block_refs + ) + ( full_block, block_record, @@ -803,12 +883,25 @@ def get_consecutive_blocks( assert full_block.foliage_transaction_block is not None elif guarantee_transaction_block: continue - # print(f"{full_block.height}: difficulty {difficulty} " + # print(f"{full_block.height:4}: difficulty {difficulty} " # f"time: {new_timestamp - last_timestamp:0.2f} " + # f"additions: {len(additions) if block_record.is_transaction_block else 0:2} " + # f"removals: {len(removals) if block_record.is_transaction_block else 0:2} " + # f"refs: {len(full_block.transactions_generator_ref_list):3} " # f"tx: {block_record.is_transaction_block}") last_timestamp = new_timestamp block_list.append(full_block) + + if include_transactions: + for coin in full_block.get_included_reward_coins(): + if coin.puzzle_hash == self.farmer_ph: + pending_rewards.append(coin) + if full_block.is_transaction_block(): + available_coins.extend(pending_rewards) + pending_rewards = [] + if full_block.transactions_generator is not None: + tx_block_heights.append(full_block.height) compressor_arg = detect_potential_template_generator( full_block.height, full_block.transactions_generator ) @@ -972,6 +1065,12 @@ def get_consecutive_blocks( if transaction_data is not None: additions = compute_additions_unchecked(transaction_data) removals = transaction_data.removals() + elif include_transactions: + assert wallet is not None + assert rng is not None + transaction_data, additions = make_spend_bundle(available_coins, wallet, rng) + removals = transaction_data.removals() + transaction_data_included = False sub_slots_finished += 1 self.log.info( f"Sub slot finished. blocks included: {blocks_added_this_sub_slot} blocks_per_slot: " @@ -1050,6 +1149,23 @@ def get_consecutive_blocks( block_generator = None aggregate_signature = G2Element() + if dummy_block_references: + if block_generator is None: + program = SerializedProgram.from_bytes(solution_generator([])) + block_generator = BlockGenerator(program, [], []) + + if len(tx_block_heights) > 4: + block_refs = [ + tx_block_heights[1], + tx_block_heights[len(tx_block_heights) // 2], + tx_block_heights[-2], + ] + else: + block_refs = [] + block_generator = dataclasses.replace( + block_generator, block_height_list=block_generator.block_height_list + block_refs + ) + ( full_block, block_record, @@ -1094,13 +1210,26 @@ def get_consecutive_blocks( assert full_block.foliage_transaction_block is not None elif guarantee_transaction_block: continue - # print(f"{full_block.height}: difficulty {difficulty} " + # print(f"{full_block.height:4}: difficulty {difficulty} " # f"time: {new_timestamp - last_timestamp:0.2f} " + # f"additions: {len(additions) if block_record.is_transaction_block else 0:2} " + # f"removals: {len(removals) if block_record.is_transaction_block else 0:2} " + # f"refs: {len(full_block.transactions_generator_ref_list):3} " # f"tx: {block_record.is_transaction_block}") last_timestamp = new_timestamp block_list.append(full_block) + + if include_transactions: + for coin in full_block.get_included_reward_coins(): + if coin.puzzle_hash == self.farmer_ph: + pending_rewards.append(coin) + if full_block.is_transaction_block(): + available_coins.extend(pending_rewards) + pending_rewards = [] + if full_block.transactions_generator is not None: + tx_block_heights.append(full_block.height) compressor_arg = detect_potential_template_generator( full_block.height, full_block.transactions_generator ) diff --git a/chia/util/block_cache.py b/chia/util/block_cache.py index e910a8276981..4c4d169aa9ec 100644 --- a/chia/util/block_cache.py +++ b/chia/util/block_cache.py @@ -59,6 +59,9 @@ def height_to_hash(self, height: uint32) -> Optional[bytes32]: def contains_block(self, header_hash: bytes32) -> bool: return header_hash in self._block_records + async def contains_block_from_db(self, header_hash: bytes32) -> bool: + return header_hash in self._block_records + def contains_height(self, height: uint32) -> bool: return height in self._height_to_hash @@ -74,6 +77,12 @@ async def get_block_records_at(self, heights: List[uint32]) -> List[BlockRecord] async def get_block_record_from_db(self, header_hash: bytes32) -> Optional[BlockRecord]: return self._block_records[header_hash] + async def prev_block_hash(self, header_hashes: List[bytes32]) -> List[bytes32]: + ret = [] + for h in header_hashes: + ret.append(self._block_records[h].prev_hash) + return ret + def remove_block_record(self, header_hash: bytes32) -> None: del self._block_records[header_hash] diff --git a/chia/wallet/wallet_blockchain.py b/chia/wallet/wallet_blockchain.py index 2b02927ef057..dbd93b61557a 100644 --- a/chia/wallet/wallet_blockchain.py +++ b/chia/wallet/wallet_blockchain.py @@ -132,7 +132,7 @@ async def add_block(self, block: HeaderBlock) -> Tuple[AddBlockResult, Optional[ if block_record.prev_hash == self._peak.header_hash: fork_height: int = self._peak.height else: - fork_height = find_fork_point_in_chain(self, block_record, self._peak) + fork_height = await find_fork_point_in_chain(self, block_record, self._peak) await self._rollback_to_height(fork_height) curr_record: BlockRecord = block_record latest_timestamp = self._latest_timestamp @@ -193,6 +193,11 @@ def get_latest_timestamp(self) -> uint64: def contains_block(self, header_hash: bytes32) -> bool: return header_hash in self._block_records + async def contains_block_from_db(self, header_hash: bytes32) -> bool: + # the wallet doesn't have the blockchain DB, this implements the + # blockchain_interface + return header_hash in self._block_records + def contains_height(self, height: uint32) -> bool: return height in self._height_to_hash @@ -200,13 +205,22 @@ def height_to_hash(self, height: uint32) -> bytes32: return self._height_to_hash[height] def try_block_record(self, header_hash: bytes32) -> Optional[BlockRecord]: - if self.contains_block(header_hash): - return self.block_record(header_hash) - return None + return self._block_records.get(header_hash) def block_record(self, header_hash: bytes32) -> BlockRecord: return self._block_records[header_hash] + async def get_block_record_from_db(self, header_hash: bytes32) -> Optional[BlockRecord]: + # the wallet doesn't have the blockchain DB, this implements the + # blockchain_interface + return self._block_records.get(header_hash) + + async def prev_block_hash(self, header_hashes: List[bytes32]) -> List[bytes32]: + ret = [] + for h in header_hashes: + ret.append(self._block_records[h].prev_hash) + return ret + def add_block_record(self, block_record: BlockRecord) -> None: self._block_records[block_record.header_hash] = block_record diff --git a/tests/blockchain/blockchain_test_utils.py b/tests/blockchain/blockchain_test_utils.py index db2494e9f9aa..19654b37ec56 100644 --- a/tests/blockchain/blockchain_test_utils.py +++ b/tests/blockchain/blockchain_test_utils.py @@ -2,11 +2,12 @@ from typing import List, Optional +from chia.consensus.block_body_validation import ForkInfo from chia.consensus.blockchain import AddBlockResult, Blockchain from chia.consensus.multiprocess_validation import PreValidationResult from chia.types.full_block import FullBlock from chia.util.errors import Err -from chia.util.ints import uint32, uint64 +from chia.util.ints import uint64 async def check_block_store_invariant(bc: Blockchain): @@ -44,7 +45,7 @@ async def _validate_and_add_block( expected_result: Optional[AddBlockResult] = None, expected_error: Optional[Err] = None, skip_prevalidation: bool = False, - fork_point_with_peak: Optional[uint32] = None, + fork_info: Optional[ForkInfo] = None, ) -> None: # Tries to validate and add the block, and checks that there are no errors in the process and that the # block is added to the peak. @@ -81,7 +82,7 @@ async def _validate_and_add_block( result, err, _, - ) = await blockchain.add_block(block, results, fork_point_with_peak=fork_point_with_peak) + ) = await blockchain.add_block(block, results, fork_info=fork_info) await check_block_store_invariant(blockchain) if expected_error is None and expected_result != AddBlockResult.INVALID_BLOCK: @@ -107,11 +108,15 @@ async def _validate_and_add_block( async def _validate_and_add_block_multi_error( - blockchain: Blockchain, block: FullBlock, expected_errors: List[Err], skip_prevalidation: bool = False + blockchain: Blockchain, + block: FullBlock, + expected_errors: List[Err], + skip_prevalidation: bool = False, + fork_info: Optional[ForkInfo] = None, ) -> None: # Checks that the blockchain returns one of the expected errors try: - await _validate_and_add_block(blockchain, block, skip_prevalidation=skip_prevalidation) + await _validate_and_add_block(blockchain, block, skip_prevalidation=skip_prevalidation, fork_info=fork_info) except Exception as e: assert isinstance(e, AssertionError) assert e.args[0] in expected_errors @@ -124,13 +129,16 @@ async def _validate_and_add_block_multi_result( blockchain: Blockchain, block: FullBlock, expected_result: List[AddBlockResult], - skip_prevalidation: Optional[bool] = None, + skip_prevalidation: bool = False, + fork_info: Optional[ForkInfo] = None, ) -> None: try: - if skip_prevalidation is not None: - await _validate_and_add_block(blockchain, block, skip_prevalidation=skip_prevalidation) - else: - await _validate_and_add_block(blockchain, block) + await _validate_and_add_block( + blockchain, + block, + skip_prevalidation=skip_prevalidation, + fork_info=fork_info, + ) except Exception as e: assert isinstance(e, AssertionError) assert "Block was not added" in e.args[0] @@ -140,7 +148,10 @@ async def _validate_and_add_block_multi_result( async def _validate_and_add_block_no_error( - blockchain: Blockchain, block: FullBlock, skip_prevalidation: Optional[bool] = None + blockchain: Blockchain, + block: FullBlock, + skip_prevalidation: bool = False, + fork_info: Optional[ForkInfo] = None, ) -> None: # adds a block and ensures that there is no error. However, does not ensure that block extended the peak of # the blockchain @@ -153,4 +164,5 @@ async def _validate_and_add_block_no_error( AddBlockResult.ADDED_AS_ORPHAN, ], skip_prevalidation=skip_prevalidation, + fork_info=fork_info, ) diff --git a/tests/blockchain/config.py b/tests/blockchain/config.py index 3fbb3269e629..b593bfe59ade 100644 --- a/tests/blockchain/config.py +++ b/tests/blockchain/config.py @@ -1,4 +1,4 @@ from __future__ import annotations -job_timeout = 60 +job_timeout = 70 checkout_blocks_and_plots = True diff --git a/tests/blockchain/test_blockchain.py b/tests/blockchain/test_blockchain.py index 6a99a53a4969..981e102ce770 100644 --- a/tests/blockchain/test_blockchain.py +++ b/tests/blockchain/test_blockchain.py @@ -7,15 +7,16 @@ import time from contextlib import asynccontextmanager from dataclasses import replace -from typing import List +from typing import List, Optional import pytest from chia_rs import AugSchemeMPL, G2Element from clvm.casts import int_to_bytes +from chia.consensus.block_body_validation import ForkInfo from chia.consensus.block_header_validation import validate_finished_header_block from chia.consensus.block_rewards import calculate_base_farmer_reward -from chia.consensus.blockchain import AddBlockResult +from chia.consensus.blockchain import AddBlockResult, Blockchain from chia.consensus.coinbase import create_farmer_coin from chia.consensus.constants import ConsensusConstants from chia.consensus.multiprocess_validation import PreValidationResult @@ -55,6 +56,7 @@ _validate_and_add_block_multi_error, _validate_and_add_block_multi_result, _validate_and_add_block_no_error, + check_block_store_invariant, ) from tests.conftest import ConsensusMode from tests.util.blockchain import create_blockchain @@ -3132,48 +3134,147 @@ async def test_basic_reorg(self, empty_blockchain, bt): assert b.get_peak().height == 16 @pytest.mark.anyio - async def test_long_reorg(self, empty_blockchain, default_1500_blocks, test_long_reorg_blocks, bt): + @pytest.mark.parametrize("light_blocks", [True, False]) + async def test_long_reorg( + self, + light_blocks: bool, + empty_blockchain: Blockchain, + default_10000_blocks: List[FullBlock], + test_long_reorg_blocks: List[FullBlock], + test_long_reorg_blocks_light: List[FullBlock], + ): + if light_blocks: + reorg_blocks = test_long_reorg_blocks_light[:1650] + else: + reorg_blocks = test_long_reorg_blocks[:1200] + # Reorg longer than a difficulty adjustment # Also tests higher weight chain but lower height b = empty_blockchain - num_blocks_chain_1 = 3 * bt.constants.EPOCH_BLOCKS + bt.constants.MAX_SUB_SLOT_BLOCKS + 10 - num_blocks_chain_2_start = bt.constants.EPOCH_BLOCKS - 20 + num_blocks_chain_1 = 1600 + num_blocks_chain_2_start = 500 assert num_blocks_chain_1 < 10000 - blocks = default_1500_blocks[:num_blocks_chain_1] + blocks = default_10000_blocks[:num_blocks_chain_1] - for block in blocks: - await _validate_and_add_block(b, block, skip_prevalidation=True) - chain_1_height = b.get_peak().height - chain_1_weight = b.get_peak().weight + print(f"pre-validating {len(blocks)} blocks") + pre_validation_results: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing( + blocks, {}, validate_signatures=False + ) + + for i, block in enumerate(blocks): + assert pre_validation_results[i].error is None + if (block.height % 100) == 0: + print(f"main chain: {block.height:4} weight: {block.weight}") + (result, err, _) = await b.add_block(block, pre_validation_results[i]) + await check_block_store_invariant(b) + assert err is None + assert result == AddBlockResult.NEW_PEAK + + peak = b.get_peak() + assert peak is not None + chain_1_height = peak.height + chain_1_weight = peak.weight assert chain_1_height == (num_blocks_chain_1 - 1) # The reorg blocks will have less time between them (timestamp) and therefore will make difficulty go up # This means that the weight will grow faster, and we can get a heavier chain with lower height - # If these assert fail, you probably need to change the fixture in test_long_reorg_blocks to create the + # If these assert fail, you probably need to change the fixture in reorg_blocks to create the # right amount of blocks at the right time - assert test_long_reorg_blocks[num_blocks_chain_2_start - 1] == default_1500_blocks[num_blocks_chain_2_start - 1] - assert test_long_reorg_blocks[num_blocks_chain_2_start] != default_1500_blocks[num_blocks_chain_2_start] + assert reorg_blocks[num_blocks_chain_2_start - 1] == default_10000_blocks[num_blocks_chain_2_start - 1] + assert reorg_blocks[num_blocks_chain_2_start] != default_10000_blocks[num_blocks_chain_2_start] + + # one aspect of this test is to make sure we can reorg blocks that are + # not in the cache. We need to explicitly prune the cache to get that + # effect. + b.clean_block_records() + + first_peak = b.get_peak() + fork_info: Optional[ForkInfo] = None + for reorg_block in reorg_blocks: + if (reorg_block.height % 100) == 0: + peak = b.get_peak() + assert peak is not None + print( + f"reorg chain: {reorg_block.height:4} " + f"weight: {reorg_block.weight:7} " + f"peak: {str(peak.header_hash)[:6]}" + ) - for reorg_block in test_long_reorg_blocks: if reorg_block.height < num_blocks_chain_2_start: - await _validate_and_add_block( - b, reorg_block, expected_result=AddBlockResult.ALREADY_HAVE_BLOCK, skip_prevalidation=True - ) + await _validate_and_add_block(b, reorg_block, expected_result=AddBlockResult.ALREADY_HAVE_BLOCK) elif reorg_block.weight <= chain_1_weight: - await _validate_and_add_block_multi_result( - b, - reorg_block, - [AddBlockResult.ADDED_AS_ORPHAN, AddBlockResult.ALREADY_HAVE_BLOCK], - skip_prevalidation=True, + if fork_info is None: + fork_info = ForkInfo(reorg_block.height - 1, reorg_block.height - 1, reorg_block.prev_header_hash) + await _validate_and_add_block( + b, reorg_block, expected_result=AddBlockResult.ADDED_AS_ORPHAN, fork_info=fork_info ) elif reorg_block.weight > chain_1_weight: - assert reorg_block.height < chain_1_height - await _validate_and_add_block(b, reorg_block, skip_prevalidation=True) + await _validate_and_add_block( + b, reorg_block, expected_result=AddBlockResult.NEW_PEAK, fork_info=fork_info + ) + + # if these asserts fires, there was no reorg + peak = b.get_peak() + assert peak is not None + assert first_peak != peak + assert peak is not None + assert peak.weight > chain_1_weight + second_peak = peak + + if light_blocks: + assert peak.height > chain_1_height + else: + assert peak.height < chain_1_height + + chain_2_weight = peak.weight - assert b.get_peak().weight > chain_1_weight - assert b.get_peak().height < chain_1_height + # now reorg back to the original chain + # this exercises the case where we have some of the blocks in the DB already + b.clean_block_records() + + if light_blocks: + blocks = default_10000_blocks[num_blocks_chain_2_start - 100 : 1800] + else: + blocks = default_10000_blocks[num_blocks_chain_2_start - 100 : 2600] + + # the block validation requires previous block records to be in the + # cache + br = await b.get_block_record_from_db(blocks[0].prev_header_hash) + for i in range(200): + assert br is not None + b.add_block_record(br) + br = await b.get_block_record_from_db(br.prev_hash) + assert br is not None + b.add_block_record(br) + + # start the fork point a few blocks back, to test that the blockchain + # can catch up + fork_block = default_10000_blocks[num_blocks_chain_2_start - 200] + fork_info = ForkInfo(fork_block.height, fork_block.height, fork_block.header_hash) + for block in blocks: + if (block.height % 128) == 0: + peak = b.get_peak() + assert peak is not None + print( + f"original chain: {block.height:4} " + f"weight: {block.weight:7} " + f"peak: {str(peak.header_hash)[:6]}" + ) + if block.height <= chain_1_height: + expect = AddBlockResult.ALREADY_HAVE_BLOCK + elif block.weight < chain_2_weight: + expect = AddBlockResult.ADDED_AS_ORPHAN + else: + expect = AddBlockResult.NEW_PEAK + await _validate_and_add_block(b, block, fork_info=fork_info, expected_result=expect) + + # if these asserts fires, there was no reorg back to the original chain + peak = b.get_peak() + assert peak is not None + assert peak.header_hash != second_peak + assert peak.weight > chain_2_weight @pytest.mark.anyio async def test_long_compact_blockchain(self, empty_blockchain, default_2000_blocks_compact): @@ -3369,15 +3470,16 @@ async def test_reorg_new_ref(empty_blockchain, bt): blocks_reorg_chain = bt.get_consecutive_blocks(4, blocks_reorg_chain, seed=b"2") for i, block in enumerate(blocks_reorg_chain): - fork_point_with_peak = None + fork_info: Optional[ForkInfo] = None if i < 10: expected = AddBlockResult.ALREADY_HAVE_BLOCK elif i < 20: expected = AddBlockResult.ADDED_AS_ORPHAN else: expected = AddBlockResult.NEW_PEAK - fork_point_with_peak = uint32(1) - await _validate_and_add_block(b, block, expected_result=expected, fork_point_with_peak=fork_point_with_peak) + if fork_info is None: + fork_info = ForkInfo(blocks[1].height, blocks[1].height, blocks[1].header_hash) + await _validate_and_add_block(b, block, expected_result=expected, fork_info=fork_info) assert b.get_peak().height == 20 @@ -3425,9 +3527,10 @@ async def test_reorg_stale_fork_height(empty_blockchain, bt): for block in blocks[:5]: await _validate_and_add_block(b, block, expected_result=AddBlockResult.NEW_PEAK) - # fake the fork_height to make every new block look like a reorg + # fake the fork_info to make every new block look like a reorg + fork_info = ForkInfo(blocks[1].height, blocks[1].height, blocks[1].header_hash) for block in blocks[5:]: - await _validate_and_add_block(b, block, expected_result=AddBlockResult.NEW_PEAK, fork_point_with_peak=2) + await _validate_and_add_block(b, block, expected_result=AddBlockResult.NEW_PEAK, fork_info=fork_info) assert b.get_peak().height == 13 @@ -3579,17 +3682,15 @@ async def test_reorg_flip_flop(empty_blockchain, bt): block1, block2 = b1, b2 counter += 1 - fork_height = 2 if counter > 3 else None - preval: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing( [block1], {}, validate_signatures=False ) - result, err, _ = await b.add_block(block1, preval[0], fork_point_with_peak=fork_height) + result, err, _ = await b.add_block(block1, preval[0]) assert not err preval: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing( [block2], {}, validate_signatures=False ) - result, err, _ = await b.add_block(block2, preval[0], fork_point_with_peak=fork_height) + result, err, _ = await b.add_block(block2, preval[0]) assert not err assert b.get_peak().height == 39 diff --git a/tests/blockchain/test_lookup_fork_chain.py b/tests/blockchain/test_lookup_fork_chain.py new file mode 100644 index 000000000000..3734e65d234c --- /dev/null +++ b/tests/blockchain/test_lookup_fork_chain.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List + +import pytest + +from benchmarks.utils import rand_hash +from chia.consensus.block_record import BlockRecord +from chia.consensus.blockchain_interface import BlockchainInterface +from chia.consensus.find_fork_point import find_fork_point_in_chain, lookup_fork_chain +from chia.types.blockchain_format.sized_bytes import bytes32 +from chia.util.ints import uint32 + + +class DummyChain: + _chain: Dict[bytes32, bytes32] + + def __init__(self) -> None: + self._chain = {} + + def add_block(self, h: bytes32, prev: bytes32) -> None: + self._chain[h] = prev + + async def prev_block_hash(self, header_hashes: List[bytes32]) -> List[bytes32]: + ret: List[bytes32] = [] + for h in header_hashes: + ret.append(self._chain[h]) + return ret + + +A = rand_hash() +B = rand_hash() +C = rand_hash() +D = rand_hash() +E = rand_hash() +F = rand_hash() +G = rand_hash() +H = rand_hash() + +dummy_chain = DummyChain() +dummy_chain.add_block(G, H) +dummy_chain.add_block(D, G) +dummy_chain.add_block(C, D) +dummy_chain.add_block(B, C) +dummy_chain.add_block(A, B) +dummy_chain.add_block(E, D) +dummy_chain.add_block(F, E) + +test_chain: BlockchainInterface = dummy_chain # type: ignore[assignment] + +# A +# | +# v +# B F +# | | +# v v +# C E +# \ / +# v v +# D +# | +# v +# G + + +@dataclass +class FakeBlockRecord: + height: uint32 + header_hash: bytes32 + prev_hash: bytes32 + + +def BR(height: int, header_hash: bytes32, prev_hash: bytes32) -> BlockRecord: + ret = FakeBlockRecord(uint32(height), header_hash, prev_hash) + return ret # type: ignore[return-value] + + +@pytest.mark.anyio +async def test_no_fork() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(42), A), (uint32(42), A)) + assert chain == {} + assert fork_hash == A + + fork_height = await find_fork_point_in_chain(test_chain, BR(42, A, B), BR(42, A, B)) + assert fork_height == 42 + + +@pytest.mark.anyio +async def test_fork_left() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(42), A), (uint32(41), F)) + assert chain == {uint32(40): E, uint32(41): F} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(42, A, B), BR(41, F, E)) + assert fork_height == 39 + + +@pytest.mark.anyio +async def test_fork_left_short() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(41), B), (uint32(41), F)) + assert chain == {uint32(40): E, uint32(41): F} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(41, B, C), BR(41, F, E)) + assert fork_height == 39 + + +@pytest.mark.anyio +async def test_fork_right() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(41), F), (uint32(42), A)) + assert chain == {uint32(40): C, uint32(41): B, uint32(42): A} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(41, F, E), BR(42, A, B)) + assert fork_height == 39 + + +@pytest.mark.anyio +async def test_fork_right_short() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(41), F), (uint32(41), B)) + assert chain == {uint32(40): C, uint32(41): B} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(41, F, E), BR(41, B, C)) + assert fork_height == 39 + + +@pytest.mark.anyio +async def test_linear_long() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(39), D), (uint32(42), A)) + assert chain == {uint32(40): C, uint32(41): B, uint32(42): A} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(39, D, G), BR(42, A, B)) + assert fork_height == 39 + + +@pytest.mark.anyio +async def test_linear_short() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(42), A), (uint32(39), D)) + assert chain == {} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(42, A, B), BR(39, D, G)) + assert fork_height == 39 + + +@pytest.mark.anyio +async def test_no_shared_left() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(1), F), (uint32(1), B)) + assert chain == {uint32(0): C, uint32(1): B} + assert fork_hash == bytes32([0] * 32) + + fork_height = await find_fork_point_in_chain(test_chain, BR(1, F, E), BR(1, B, C)) + assert fork_height == -1 + + +@pytest.mark.anyio +async def test_no_shared_right() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(1), B), (uint32(1), F)) + assert chain == {uint32(0): E, uint32(1): F} + assert fork_hash == bytes32([0] * 32) + + fork_height = await find_fork_point_in_chain(test_chain, BR(1, B, C), BR(1, F, E)) + assert fork_height == -1 + + +@pytest.mark.anyio +async def test_root_shared_left() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(2), F), (uint32(2), B)) + assert chain == {uint32(1): C, uint32(2): B} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(2, F, E), BR(2, B, C)) + assert fork_height == 0 + + +@pytest.mark.anyio +async def test_root_shared_right() -> None: + chain, fork_hash = await lookup_fork_chain(test_chain, (uint32(2), B), (uint32(2), F)) + assert chain == {uint32(1): E, uint32(2): F} + assert fork_hash == D + + fork_height = await find_fork_point_in_chain(test_chain, BR(2, B, C), BR(2, F, E)) + assert fork_height == 0 diff --git a/tests/conftest.py b/tests/conftest.py index 9b05f46a309d..46a549a414c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -237,7 +237,7 @@ def softfork_height(request) -> int: return request.param -saved_blocks_version = "rc5" +saved_blocks_version = "2.0" @pytest.fixture(scope="session") @@ -298,11 +298,19 @@ def default_10000_blocks(bt, consensus_mode): if consensus_mode == ConsensusMode.HARD_FORK_2_0: version = "_hardfork" - return persistent_blocks(10000, f"test_blocks_10000_{saved_blocks_version}{version}.db", bt, seed=b"10000") + return persistent_blocks( + 10000, + f"test_blocks_10000_{saved_blocks_version}{version}.db", + bt, + seed=b"10000", + dummy_block_references=True, + ) +# this long reorg chain shares the first 500 blocks with "default_10000_blocks" +# and has heavier weight blocks @pytest.fixture(scope="session") -def test_long_reorg_blocks(bt, consensus_mode, default_1500_blocks): +def test_long_reorg_blocks(bt, consensus_mode, default_10000_blocks): version = "" if consensus_mode == ConsensusMode.HARD_FORK_2_0: version = "_hardfork" @@ -310,12 +318,35 @@ def test_long_reorg_blocks(bt, consensus_mode, default_1500_blocks): from tests.util.blockchain import persistent_blocks return persistent_blocks( - 758, + 4500, f"test_blocks_long_reorg_{saved_blocks_version}{version}.db", bt, - block_list_input=default_1500_blocks[:320], + block_list_input=default_10000_blocks[:500], seed=b"reorg_blocks", time_per_block=8, + dummy_block_references=True, + include_transactions=True, + ) + + +# this long reorg chain shares the first 500 blocks with "default_10000_blocks" +# and has the same weight blocks +@pytest.fixture(scope="session") +def test_long_reorg_blocks_light(bt, consensus_mode, default_10000_blocks): + version = "" + if consensus_mode == ConsensusMode.HARD_FORK_2_0: + version = "_hardfork" + + from tests.util.blockchain import persistent_blocks + + return persistent_blocks( + 4500, + f"test_blocks_long_reorg_light_{saved_blocks_version}{version}.db", + bt, + block_list_input=default_10000_blocks[:500], + seed=b"reorg_blocks2", + dummy_block_references=True, + include_transactions=True, ) diff --git a/tests/core/full_node/stores/test_block_store.py b/tests/core/full_node/stores/test_block_store.py index 64c825f5272a..244a4bcca729 100644 --- a/tests/core/full_node/stores/test_block_store.py +++ b/tests/core/full_node/stores/test_block_store.py @@ -443,3 +443,35 @@ async def test_get_peak(tmp_dir: Path, db_version: int, use_cache: bool) -> None block_hash, height = res assert block_hash == b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" assert height == 1337 + + +@pytest.mark.limit_consensus_modes(reason="save time") +@pytest.mark.anyio +async def test_get_prev_hash(tmp_dir: Path, bt: BlockTools, db_version: int, use_cache: bool) -> None: + assert sqlite3.threadsafety >= 1 + blocks = bt.get_consecutive_blocks(10) + + async with DBConnection(db_version) as db_wrapper, DBConnection(db_version) as db_wrapper_2: + # Use a different file for the blockchain + coin_store_2 = await CoinStore.create(db_wrapper_2) + store_2 = await BlockStore.create(db_wrapper_2, use_cache=use_cache) + bc = await Blockchain.create(coin_store_2, store_2, bt.constants, tmp_dir, 2) + + store = await BlockStore.create(db_wrapper, use_cache=use_cache) + await BlockStore.create(db_wrapper_2) + + # Save/get block + for block in blocks: + await _validate_and_add_block(bc, block) + block_record = bc.block_record(block.header_hash) + await store.add_full_block(block.header_hash, block, block_record) + + for i, block in enumerate(blocks): + prev_hash = await store.get_prev_hash(block.header_hash) + if i == 0: + assert prev_hash == bt.constants.GENESIS_CHALLENGE + else: + assert prev_hash == blocks[i - 1].header_hash + + with pytest.raises(KeyError, match="missing block in chain"): + await store.get_prev_hash(bytes32.from_bytes(b"yolo" * 8)) diff --git a/tests/core/full_node/stores/test_full_node_store.py b/tests/core/full_node/stores/test_full_node_store.py index ad6c1c48b4d6..07d9ac841167 100644 --- a/tests/core/full_node/stores/test_full_node_store.py +++ b/tests/core/full_node/stores/test_full_node_store.py @@ -289,7 +289,7 @@ async def test_basic_store( assert peak_here is not None if peak_here.header_hash == block.header_hash: sb = blockchain.block_record(block.header_hash) - fork = find_fork_point_in_chain(blockchain, peak, blockchain.block_record(sb.header_hash)) + fork = await find_fork_point_in_chain(blockchain, peak, blockchain.block_record(sb.header_hash)) if fork > 0: fork_block = blockchain.height_to_block_record(uint32(fork)) else: @@ -366,7 +366,7 @@ async def test_basic_store( assert peak_here is not None if peak_here.header_hash == blocks[-1].header_hash: sb = blockchain.block_record(blocks[-1].header_hash) - fork = find_fork_point_in_chain(blockchain, peak, blockchain.block_record(sb.header_hash)) + fork = await find_fork_point_in_chain(blockchain, peak, blockchain.block_record(sb.header_hash)) if fork > 0: fork_block = blockchain.height_to_block_record(uint32(fork)) else: diff --git a/tests/core/full_node/test_full_node.py b/tests/core/full_node/test_full_node.py index a263453b913c..169cb3054857 100644 --- a/tests/core/full_node/test_full_node.py +++ b/tests/core/full_node/test_full_node.py @@ -12,6 +12,7 @@ from chia_rs import AugSchemeMPL, G2Element, PrivateKey from clvm.casts import int_to_bytes +from chia.consensus.block_body_validation import ForkInfo from chia.consensus.pot_iterations import is_overflow_block from chia.full_node.bundle_tools import detect_potential_template_generator from chia.full_node.full_node import WalletUpdate @@ -2153,19 +2154,115 @@ async def test_wallet_sync_task_failure( @pytest.mark.anyio -@pytest.mark.limit_consensus_modes(reason="this test is too slow (for now)") +@pytest.mark.parametrize("light_blocks", [True, False]) +async def test_long_reorg( + light_blocks: bool, + one_node_one_block, + default_10000_blocks: List[FullBlock], + test_long_reorg_blocks: List[FullBlock], + test_long_reorg_blocks_light: List[FullBlock], + seeded_random: random.Random, +): + node, server, bt = one_node_one_block + + fork_point = 499 + blocks = default_10000_blocks[:1600] + + if light_blocks: + # if the blocks have lighter weight, we need more height to compensate, + # to force a reorg + reorg_blocks = test_long_reorg_blocks_light[:1650] + else: + reorg_blocks = test_long_reorg_blocks[:1200] + + for block_batch in to_batches(blocks, 64): + b = block_batch.entries[0] + if (b.height % 128) == 0: + print(f"main chain: {b.height:4} weight: {b.weight}") + await node.full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 8884), None) + + peak = node.full_node.blockchain.get_peak() + chain_1_height = peak.height + chain_1_weight = peak.weight + chain_1_peak = peak.header_hash + + assert reorg_blocks[fork_point] == default_10000_blocks[fork_point] + assert reorg_blocks[fork_point + 1] != default_10000_blocks[fork_point + 1] + + # one aspect of this test is to make sure we can reorg blocks that are + # not in the cache. We need to explicitly prune the cache to get that + # effect. + node.full_node.blockchain.clean_block_records() + + fork_info: Optional[ForkInfo] = None + for b in reorg_blocks: + if (b.height % 128) == 0: + peak = node.full_node.blockchain.get_peak() + print(f"reorg chain: {b.height:4} " f"weight: {b.weight:7} " f"peak: {str(peak.header_hash)[:6]}") + if b.height > fork_point and fork_info is None: + fork_info = ForkInfo(fork_point, fork_point, reorg_blocks[fork_point].header_hash) + await node.full_node.add_block(b, fork_info=fork_info) + + # if these asserts fires, there was no reorg + peak = node.full_node.blockchain.get_peak() + assert peak.header_hash != chain_1_peak + assert peak.weight > chain_1_weight + chain_2_weight = peak.weight + chain_2_peak = peak.header_hash + + # if the reorg chain has lighter blocks, once we've re-orged onto it, we + # have a greater block height. If the reorg chain has heavier blocks, we + # end up with a lower height than the original chain (but greater weight) + if light_blocks: + assert peak.height > chain_1_height + else: + assert peak.height < chain_1_height + + # now reorg back to the original chain + # this exercises the case where we have some of the blocks in the DB already + node.full_node.blockchain.clean_block_records() + + if light_blocks: + blocks = default_10000_blocks[fork_point - 100 : 1800] + else: + blocks = default_10000_blocks[fork_point - 100 : 2600] + + fork_block = blocks[0] + fork_info = ForkInfo(fork_block.height - 1, fork_block.height - 1, fork_block.prev_header_hash) + for b in blocks: + if (b.height % 128) == 0: + peak = node.full_node.blockchain.get_peak() + print(f"original chain: {b.height:4} " f"weight: {b.weight:7} " f"peak: {str(peak.header_hash)[:6]}") + await node.full_node.add_block(b, fork_info=fork_info) + + # if these asserts fires, there was no reorg back to the original chain + peak = node.full_node.blockchain.get_peak() + assert peak.header_hash != chain_2_peak + assert peak.weight > chain_2_weight + + +@pytest.mark.anyio +@pytest.mark.parametrize("light_blocks", [True, False]) +@pytest.mark.parametrize("chain_length", [0, 100]) +@pytest.mark.limit_consensus_modes(allowed=[ConsensusMode.PLAIN], reason="save time") async def test_long_reorg_nodes( + light_blocks: bool, + chain_length: int, three_nodes, default_10000_blocks: List[FullBlock], test_long_reorg_blocks: List[FullBlock], + test_long_reorg_blocks_light: List[FullBlock], self_hostname: str, seeded_random: random.Random, ): full_node_1, full_node_2, full_node_3 = three_nodes - blocks = default_10000_blocks[:1600] + blocks = default_10000_blocks[: 1600 - chain_length] - reorg_blocks = test_long_reorg_blocks[:1200] + if light_blocks: + reorg_blocks = test_long_reorg_blocks_light[: 1600 - chain_length] + else: + reorg_blocks = test_long_reorg_blocks[: 1200 - chain_length] # full node 1 has the original chain for block_batch in to_batches(blocks, 64): @@ -2193,7 +2290,7 @@ def check_nodes_in_sync(): p2 = full_node_1.full_node.blockchain.get_peak() return p1 == p2 - await time_out_assert(120, check_nodes_in_sync) + await time_out_assert(100, check_nodes_in_sync) peak = full_node_2.full_node.blockchain.get_peak() print(f"peak: {str(peak.header_hash)[:6]}") @@ -2214,20 +2311,20 @@ def check_nodes_in_sync(): print("connecting node 3") await connect_and_get_peer(full_node_3.full_node.server, full_node_1.full_node.server, self_hostname) - # await connect_and_get_peer(full_node_3.full_node.server, full_node_2.full_node.server, self_hostname) + await connect_and_get_peer(full_node_3.full_node.server, full_node_2.full_node.server, self_hostname) def check_nodes_in_sync2(): p1 = full_node_1.full_node.blockchain.get_peak() - # p2 = full_node_2.full_node.blockchain.get_peak() + p2 = full_node_2.full_node.blockchain.get_peak() p3 = full_node_3.full_node.blockchain.get_peak() - return p1 == p3 + return p1 == p3 and p1 == p2 - await time_out_assert(2000, check_nodes_in_sync2) + await time_out_assert(900, check_nodes_in_sync2) p1 = full_node_1.full_node.blockchain.get_peak() - # p2 = full_node_2.full_node.blockchain.get_peak() + p2 = full_node_2.full_node.blockchain.get_peak() p3 = full_node_3.full_node.blockchain.get_peak() assert p1.header_hash == blocks[-1].header_hash - # assert p2.header_hash == blocks[-1].header_hash + assert p2.header_hash == blocks[-1].header_hash assert p3.header_hash == blocks[-1].header_hash diff --git a/tests/plot_sync/util.py b/tests/plot_sync/util.py index 9afdb1bcd4d6..1a6daf27c7b8 100644 --- a/tests/plot_sync/util.py +++ b/tests/plot_sync/util.py @@ -28,6 +28,9 @@ class WSChiaConnectionDummy: async def send_message(self, message: Message) -> None: self.last_sent_message = message + def get_peer_logging(self) -> PeerInfo: + return self.peer_info + def get_dummy_connection(node_type: NodeType, peer_id: bytes32) -> WSChiaConnectionDummy: return WSChiaConnectionDummy(node_type, peer_id) diff --git a/tests/util/blockchain.py b/tests/util/blockchain.py index 1222e51bd0f6..0fd7f9878ea1 100644 --- a/tests/util/blockchain.py +++ b/tests/util/blockchain.py @@ -21,7 +21,7 @@ async def create_blockchain(constants: ConsensusConstants, db_version: int) -> T coin_store = await CoinStore.create(wrapper) store = await BlockStore.create(wrapper) - bc1 = await Blockchain.create(coin_store, store, constants, Path("."), 2) + bc1 = await Blockchain.create(coin_store, store, constants, Path("."), 2, single_threaded=True) assert bc1.get_peak() is None return bc1, wrapper @@ -32,12 +32,15 @@ def persistent_blocks( bt: BlockTools, seed: bytes = b"", empty_sub_slots: int = 0, + *, normalized_to_identity_cc_eos: bool = False, normalized_to_identity_icc_eos: bool = False, normalized_to_identity_cc_sp: bool = False, normalized_to_identity_cc_ip: bool = False, block_list_input: Optional[List[FullBlock]] = None, time_per_block: Optional[float] = None, + dummy_block_references: bool = False, + include_transactions: bool = False, ) -> List[FullBlock]: # try loading from disc, if not create new blocks.db file # TODO hash fixtures.py and blocktool.py, add to path, delete if the files changed @@ -77,10 +80,12 @@ def persistent_blocks( bt, block_list_input, time_per_block, - normalized_to_identity_cc_eos, - normalized_to_identity_icc_eos, - normalized_to_identity_cc_sp, - normalized_to_identity_cc_ip, + normalized_to_identity_cc_eos=normalized_to_identity_cc_eos, + normalized_to_identity_icc_eos=normalized_to_identity_icc_eos, + normalized_to_identity_cc_sp=normalized_to_identity_cc_sp, + normalized_to_identity_cc_ip=normalized_to_identity_cc_ip, + dummy_block_references=dummy_block_references, + include_transactions=include_transactions, ) @@ -92,10 +97,13 @@ def new_test_db( bt: BlockTools, block_list_input: List[FullBlock], time_per_block: Optional[float], + *, normalized_to_identity_cc_eos: bool = False, # CC_EOS, normalized_to_identity_icc_eos: bool = False, # ICC_EOS normalized_to_identity_cc_sp: bool = False, # CC_SP, normalized_to_identity_cc_ip: bool = False, # CC_IP + dummy_block_references: bool = False, + include_transactions: bool = False, ) -> List[FullBlock]: print(f"create {path} with {num_of_blocks} blocks with ") blocks: List[FullBlock] = bt.get_consecutive_blocks( @@ -108,6 +116,8 @@ def new_test_db( normalized_to_identity_icc_eos=normalized_to_identity_icc_eos, normalized_to_identity_cc_sp=normalized_to_identity_cc_sp, normalized_to_identity_cc_ip=normalized_to_identity_cc_ip, + dummy_block_references=dummy_block_references, + include_transactions=include_transactions, ) block_bytes_list: List[bytes] = [] for block in blocks: