Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Latest commit

 

History

History
760 lines (557 loc) · 64.5 KB

functional_test_framework.asciidoc

File metadata and controls

760 lines (557 loc) · 64.5 KB

Functional Test Framework

This text presents the main concepts of the Bitcoin Functional Test Framework, shows the most common methods and how the tests are implemented through examples. The goal is to provide a solid foundation for readers to start studying and developing their own tests.

The commit b8593616dc2 can be used as a reference for the project’s codebase at the time of writing.

git clone https://github.com/bitcoin/bitcoin.git
cd bitcoin
git checkout -b text_branch b8593616dc2

A functional test is a test that is performed to confirm that the functionality of an application or system is behaving as expected. In Bitcoin Core, it is used to test interactions of the user and other nodes through RPC and P2P interfaces.

It allows the testing of full features that take multiple layers of the stack (network, network processing, validation and so on). Two common functional test cases, for example, are the network (P2P) behavior and the RPC command, which is the tool that allows the user to interact with Bitcoin Core.

The functional tests are located in /test/functional folder (not src/test). There are a few different categories (areas) of tests:

Area Description

feature

tests for full features that aren’t wallet/mining/mempool, eg feature_rbf.py

interface

tests for other interfaces (REST, ZMQ, etc), eg interface_rest.py

mempool

tests for mempool behavior, eg mempool_reorg.py

mining

tests for mining features, eg mining_prioritisetransaction.py

rpc

tests for individual RPC methods or features, eg rpc_listtransactions.py

tool

tests for tools, eg tool_wallet.py

wallet

tests for wallet features, eg wallet_keypool.py

The tests can be run directly, with <filename> as shown below:

test/functional/rpc_getchaintips.py

Or indirectly through the test suite, implemented by test/functional/test_runner.py, as shown below. If no test file is passed as the parameter, all test files will be executed.

test/functional/test_runner.py test/functional/rpc_getchaintips.py
test/functional/test_runner.py test/functional/wallet*
test/functional/test_runner.py

The --jobs or -j enables tests to be executed more quickly in parallel. The default value of this parameter is 4.

test/functional/test_runner.py -j 60

Other parameters can be found in the --help option. It can also be verified directly in the code:

def main():
    # ...
    parser.add_argument('--help', '-h', '-?', action='store_true', help='print help text and exit')
    parser.add_argument('--jobs', '-j', type=int, default=4, help='how many test scripts to run in parallel. Default=4.')
    parser.add_argument('--keepcache', '-k', action='store_true', help='the default behavior is to flush the cache directory on startup. --keepcache retains the cache from the previous testrun.')
    parser.add_argument('--quiet', '-q', action='store_true', help='only print dots, results summary and failure logs')
    # ...

test/test_framework/*

In the test/test_framework folder, there are some files that implement useful functionalities to help the developer write a test. Most of these functions will be reused for various tests.

The table below describes some relevant files that make up the framework.

File Description

util.py

Helpful routines for regression testing such as assert functions (e.g., assert_equal, assert_raises_rpc_error) and other helper functions (e.g, satoshi_round)

test_framework.py

Mainly BitcoinTestFramework, which is the base class for a bitcoin test script.

key.py

Test-only secp256k1 elliptic curve implementation (EllipticCurve, ECPubKey, ECKey, generate_privkey())

script.py

Functionality to build scripts, as well as signature hash functions (e.g., CScriptOp(int), OP_CHECKSIGVERIFY = CScriptOp(0xad), CScript)

blocktools.py

Utilities for manipulating blocks and transactions (e.g., create_block, create_tx_with_script).

p2p.py

Test objects for interacting with a bitcoind node over the p2p protocol (e.g., P2PInterface, P2PDataStore, P2PConnection)

messages.py

Definitions for objects passed over the network (CBlock, CTransaction, etc, along with the network-level wrappers for them, msg_block, msg_tx, etc).

test_node.py

A class for representing a bitcoind node under test (TestNode).

The p2p.py file was originally called mininode.py, a reference to the p2p interface objects used to connect to the bitcoind node. However, that name proved to be confusing for new contributors, so it has been renamed to p2p.py in PR #19760.

BitcoinTestFramework

they set up any number of nodes

Every test is a subclass of BitcoinTestFramework. They set up a chain in -regtest mode (meaning that it is not necessary to wait 10 minutes for a block) and they set up any number of nodes to be used in the test.
Individual tests should subclass BitcoinTestFramework and override the set_test_params() and run_test() methods.

An attribute that is usually set in set_test_params() is the self.num_nodes, which indicates how many nodes the test will use. A node is a bitcoind instance. Each bitcoind node is managed by a python TestNode object which is used to start/ stop the node, manage the node’s data directory, read state about the node (e.g., process status, log file), and interact with the node over different interfaces.

The BitcoinTestFramework.main() method calls setup() and then run_test(). Note that shutdown() is called at the end of main function to handle the tear down, so that the tests do not have to worry about removing the nodes, closing down the network thread and cleaning up the used directories.

class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
    # ...
    def main(self):
        # ...
        assert hasattr(self, "num_nodes"), "Test must set self.num_nodes in set_test_params()"

        try:
            self.setup()
            self.run_test()
        except JSONRPCException:
        # ....
        finally:
            exit_code = self.shutdown()
            sys.exit(exit_code)

    def setup(self):
        # ...
        config = self.config

        fname_bitcoind = os.path.join(
            config["environment"]["BUILDDIR"],
            "src",
            "bitcoind" + config["environment"]["EXEEXT"],
        )
        fname_bitcoincli = os.path.join(
            config["environment"]["BUILDDIR"],
            "src",
            "bitcoin-cli" + config["environment"]["EXEEXT"],
        )
        self.options.bitcoind = os.getenv("BITCOIND", default=fname_bitcoind)
        self.options.bitcoincli = os.getenv("BITCOINCLI", default=fname_bitcoincli)
        # ...
        self.setup_chain()
        self.setup_network()
        # ...

    def run_test(self):
        """Tests must override this method to define test logic"""
        raise NotImplementedError

The setup() method gets the bitcoind and bitcoin-cli folder. Then, they (and other parameters) can be passed to TestNode. All the parameters supported by BitcoinTestFramework and TestNode can be found in the parse_args() method.

Other methods that individual tests can also override to customize the test setup are setup_chain(), setup_network() and setup_nodes().

setup_chain() calls _initialize_chain() to initialize a pre-mined blockchain for use by the test. It creates a cache of a 199-block-long chain, afterward it creates num_nodes copies from the cache.

setup_nodes() calls add_nodes(self.num_nodes, …​) to instantiate TestNode objects and then starts them. Each node runs on the localhost and has its own port number. The configuration file with the specified port number is written by the util.py:write_config() stand alone function. The start_nodes() method starts multiple bitcoinds in different ports.

This entire process ensures that each node starts out with a few coins (a pre-mined chain of 200 blocks loaded from the cache) and that all the nodes are connected to each other. If the test needs to change the network topology, customize the node’s start behavior, or customize the node’s data directories, it can override any of those methods.

class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
    # ...
    def _initialize_chain(self):
        # ...
        for i in range(8):
            cache_node.generatetoaddress(
                nblocks=25 if i != 7 else 24,
                address=gen_addresses[i % 4],
            )

        assert_equal(cache_node.getblockchaininfo()["blocks"], 199)
        # ...
    # ...
    def setup_network(self):
        self.setup_nodes()

        for i in range(self.num_nodes - 1):
            self.connect_nodes(i + 1, i)
        self.sync_all()
    # ...
    def setup_nodes(self):
        # ...
        self.add_nodes(self.num_nodes, extra_args)
        self.start_nodes()
        # ....
Note
BitcoinTestMetaClass

Tests must override set_test_params() and run_test() but not __init__() or main(). If any of these standards are violated, a TypeError will be raised. This behavior is ensured by the BitcoinTestMetaClass class, added in the PR #12856.

TestNode

TestNode class represents a bitcoind node for use in functional tests. It uses the binary that was compiled as bitcoind. (don’t forget to run make before expecting changes to be reflected in functional tests). The class contains:

For the most part, TestNode and its interfaces (i.e., RPC or p2p connection) are used to verify the behavior of nodes.

The BitcoinTestFramework:connect_nodes() method, mentioned in the previous section, uses from_connection.addnode(ip_port, "onetry") to connect a TestNode object (from_connection) to a new peer, but in the TestNode class, there is not any addnode method. The explanation is that TestNode dispatches any unrecognized messages to the RPC connection. Therefore, since the addnode method does not exist, it will be handled as an RPC request to be sent to the node. This behavior is implemented in the __getattr__() method.

# test_framework.py
class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
    # ...
    def connect_nodes(self, a, b):
        def connect_nodes_helper(from_connection, node_num):
            ip_port = "127.0.0.1:" + str(p2p_port(node_num))
            from_connection.addnode(ip_port, "onetry")
            # ...
        # ...
    # ...
# test_node.py
class TestNode():
    def __getattr__(self, name):
        """Dispatches any unrecognised messages to the RPC connection or a CLI instance."""
        if self.use_cli:
            return getattr(RPCOverloadWrapper(self.cli, True, self.descriptors), name)
        else:
            assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection")
            return getattr(RPCOverloadWrapper(self.rpc, descriptors=self.descriptors), name)

TestNode also implements common node operations such as start(), stop_node(), add_p2p_connection() and others.

If a more control over the node is required (e.g. ignore messages or introduce some specific malicious behavior), a P2PInterface is a better approach.

P2PInterface

P2PInterface allows a more customizable interaction with the node. It is a high-level P2P interface class for communicating with a Bitcoin node. Each connection to a node using this interface is managed by a python P2PInterface class or derived object (which is owned by the TestNode object).

To add a new P2PInterface connection to a node, there are two methods that can be used:

Both methods add the new P2P connection to the TestNode.p2ps list of the node object.

P2PInterface also provides high-level callbacks for processing P2P message payloads, as well as convenience methods for interacting with the node over P2P.
Individual test cases should subclass this and override the on_* methods if they want to alter message handling behavior.
The code below shows this. Note that on_message() intercepts the message type and calls the on_[msg_type] method.

# test/functional/test_framework/p2p.py
class P2PInterface(P2PConnection):
    # ...
    def on_message(self, message):
        with p2p_lock:
            try:
                msgtype = message.msgtype.decode('ascii')
                self.message_count[msgtype] += 1
                self.last_message[msgtype] = message
                getattr(self, 'on_' + msgtype)(message)
            except:
                print("ERROR delivering %s (%s)" % (repr(message), sys.exc_info()[0]))
                raise

    def on_open(self): pass
    def on_close(self):pass
    def on_addr(self, message): pass
    def on_addrv2(self, message): pass
    def on_block(self, message): pass
    def on_blocktxn(self, message): pass
    # ...
    def on_tx(self, message): pass
    def on_wtxidrelay(self, message): pass
    # ...

As can be seen in the code, P2PInterface is a subclass of the P2PConnection, which implements low-level network operations, such as opening and closing the TCP connection to the node and reading bytes from and writing bytes to the socket.
This class contains no logic for handing the P2P message payloads. It must be subclassed and the on_message() callback must be overridden, as the P2PInterface class does.

There are also two other classes:

  • P2PDataStore: A P2PInterface subclass that keeps a store of transactions and blocks and can respond correctly to getdata and getheaders messages

  • P2PTxInvStore: A P2PInterface subclass which stores a count of how many times each txid has been announced.

These two classes are generally used in some mempool, transaction and block tests. But P2PInterface is used much more frequently.

The diagram below shows the most relevant Test Framework classes.

test framework
Figure 1. Test Framework Classes

Test 1 - Sending Money

Let’s create some simple tests to see the test framework in action. A basic but important test is to check if a node is able to send money to another.

The code below implements this test.

#!/usr/bin/env python3
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
    assert_equal,
    assert_greater_than
)

class WalletSendTest(BitcoinTestFramework):
    def set_test_params(self):
        self.num_nodes = 2
        self.setup_clean_chain = True

    def skip_test_if_missing_module(self):
        self.skip_if_no_wallet()

    def run_test(self):

        assert_equal(self.nodes[0].getbalance(), 0)
        assert_equal(self.nodes[1].getbalance(), 0)

        assert_equal(len(self.nodes[0].listunspent()), 0)
        assert_equal(len(self.nodes[1].listunspent()), 0)

        self.nodes[0].generate(101)

        n1_receive = self.nodes[1].getnewaddress()
        self.nodes[0].sendtoaddress(n1_receive, 30)

        self.nodes[0].generate(1)
        self.sync_blocks()

        assert_greater_than(self.nodes[0].getbalance(), 50)
        assert_equal(self.nodes[1].getbalance(), 30)

        assert_equal(len(self.nodes[0].listunspent()), 2)
        assert_equal(len(self.nodes[1].listunspent()), 1)

if __name__ == '__main__':
    WalletSendTest().main()

BitcoinTestFramework, as previously mentioned, is the base class for all functional tests. The first thing to do is to create the subclass and then implement the set_test_params() and the run_test() methods.

In set_test_params(), the num_nodes must be defined. As the name implies, it specifies the number of nodes the test will use. This test uses two nodes (self.num_nodes = 2).

The next line is self.setup_clean_chain = True. By default, every test loads a pre-mined chain of 200 blocks from the cache, so the node will start the test with some money and be able to spend it. By setting setup_clean_chain to True, the chain will start with an empty blockchain, with no pre-mined blocks. It is useful if a test case wants complete control over initialization.

The default behavior is setup_clean_chain: bool = False, as can be seen in the code below. Therefore, to start with an empty blockchain, this property needs to be explicitly changed in the set_test_params() method.

The method which initializes an empty blockchain is the _initialize_chain_clean() while the _initialize_chain() builds a cache of a 199-block-long chain. The latter method was mentioned in the BitcoinTestFramework section.

class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
    def __init__(self):
        self.chain: str = 'regtest'
        self.setup_clean_chain: bool = False
        # ...
    # ...
    def setup_chain(self):
        """Override this method to customize blockchain setup"""
        self.log.info("Initializing test directory " + self.options.tmpdir)
        if self.setup_clean_chain:
            self._initialize_chain_clean()
        else:
            self._initialize_chain()
    # ...
    def _initialize_chain_clean(self):
        for i in range(self.num_nodes):
            initialize_datadir(self.options.tmpdir, i, self.chain)

The skip_test_if_missing_module() method is used to skip the test if it requires certain modules to be present. In that case, the test is using RPC functions that requires a wallet, such as getbalance(), listunspent(), getnewaddress() and sendtoaddress().

skip_if_no_wallet() will skip the test if the bitcoind was compiled with no wallet (`./configure --disable-wallet `).
Otherwise, it will ensure the creation of a default wallet. For this reason, the nodes of this test are able to directly access the funds without specifying a wallet (since v0.21, Bitcoin Core no longer creates a default wallet).

class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
    # ...
    def setup_nodes(self):
        # ...
        if self.requires_wallet:
            self.import_deterministic_coinbase_privkeys()
        # ...
    def import_deterministic_coinbase_privkeys(self):
        for i in range(self.num_nodes):
            self.init_wallet(i)

    def init_wallet(self, i):
        wallet_name = self.default_wallet_name if self.wallet_names is None else self.wallet_names[i] if i < len(self.wallet_names) else False
        if wallet_name is not False:
            n = self.nodes[i]
            if wallet_name is not None:
                n.createwallet(wallet_name=wallet_name, descriptors=self.options.descriptors, load_on_startup=True)
            n.importprivkey(privkey=n.get_deterministic_priv_key().key, label='coinbase')
    # ...
    def skip_if_no_wallet(self):
        """Skip the running test if wallet has not been compiled."""
        self.requires_wallet = True
        if not self.is_wallet_compiled():
            raise SkipTest("wallet has not been compiled.")
        # ...
    # ...

If skip_if_no_wallet() is not called, the test must create a wallet before using wallet operations, as shown below:

self.nodes[0].createwallet(wallet_name="w0")
wallet_node_0 = self.nodes[0].get_wallet_rpc("w0")
address = wallet_node_0.getnewaddress()

There are other skip_if_no_*() functions in the BitcoinTestFramework class, such as skip_if_no_sqlite(), skip_if_no_bdb(), skip_if_no_bitcoind_zmq() and so on. The developer should check these methods if the test uses an optional module for compiling bitcoind.

The next step in the test of sending money is the run_test() method, which implements the test.
It starts checking if the balance of each node is empty. (assert_equal(self.nodes[0].getbalance(), 0)). Note that getbalance() is an RPC command. The next validation (assert_equal(len(self.nodes[0].listunspent()), 0)) is not really necessary since the node balances has already been verified, but it is there for purpose demonstration.

TestNode.generate() method uses the generatetoaddress RPC to mine new blocks immediately to a node address. A pattern that can be noticed in the tests is the generation of 101 blocks. This is due to the COINBASE_MATURITY consensus rules. It is defined in the src/consensus/consensus.h. This rule means that coinbase transaction outputs can only be spent after a specific number of new blocks. At the moment, the number is 100. Therefore, when generating 101 blocks, the miner can spend the equivalent of 1 block (the first one that was generated).

This explains the line self.nodes[0].generate(101).

Next, the second node generates a new address and the first node sends 30 BTC to it. But at this moment, the transaction exists only in the mempool. Then, the first node mines another block to settle the transaction.

After that, the sync_blocks() method is called. It waits until all nodes have the same tip. This is another method that is used quite often and usually after generate() method to wait for the block propagation.

Then, the test checks whether the second received 30 BTC and the balance of the first node is greater than 50 BTC, since it received the block reward.

The first node also should have 2 UTXOs (change output and the block reward) and the second, only one UTXO (the received money).

More wallet tests can be found at test/functional/wallet_*.py. Two good tests to start with are wallet_basic.py and wallet_send.py.

Test 2 - Expected Mempool Behavior

The following code is a simple test that demonstrates basic mempool behavior and some common mempool test functions.

#!/usr/bin/env python3
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
    assert_equal,
    assert_greater_than
)

class MempoolSimpleTest(BitcoinTestFramework):
    def set_test_params(self):
        self.num_nodes = 2

    def skip_test_if_missing_module(self):
        self.skip_if_no_wallet()

    def run_test(self):
        assert_greater_than(self.nodes[0].getbalance(), 30)

        assert_equal(self.nodes[0].getmempoolinfo()["size"], 0)
        assert_equal(self.nodes[0].getmempoolinfo()["unbroadcastcount"], 0)

        n1_receive = self.nodes[1].getnewaddress()
        txid = self.nodes[0].sendtoaddress(n1_receive, 30)

        assert_equal(self.nodes[0].getmempoolinfo()["size"], 1)
        assert_equal(self.nodes[0].getmempoolinfo()["unbroadcastcount"], 1)
        assert txid in self.nodes[0].getrawmempool()
        assert txid not in self.nodes[1].getrawmempool()

        self.sync_mempools()

        assert_equal(self.nodes[0].getmempoolinfo()["unbroadcastcount"], 0)

        assert_equal(self.nodes[1].getmempoolinfo()["size"], 1)
        assert_equal(self.nodes[1].getmempoolinfo()["unbroadcastcount"], 0)
        assert txid in self.nodes[1].getrawmempool()

        self.nodes[0].generate(1)

        self.sync_blocks()

        assert txid not in self.nodes[0].getrawmempool()
        assert txid not in self.nodes[1].getrawmempool()

if __name__ == "__main__":
    MempoolSimpleTest().main()

The first steps are basically the same as in the previous example: declare a subclass of BitcoinTestFramework, set the number of the nodes in set_test_params() and if the test uses wallets, call skip_if_no_wallet() in skip_test_if_missing_module(). Then write the test in run_test().

The main difference, however, from the previous example is that setup_clean_chain = True is not present. This command is only necessary when the test requires complete control over initialization. For this test, spending the coinbase transaction outputs with which the nodes start is sufficient.

The first line ensures the first node has at least 30 BTC available. The second line introduces the RPC command getmempoolinfo(), which returns details on the active state of the transactions memory pool.

The relevant details for this test are the size which represents the current transaction count and the unbroadcastcount which shows the current number of transactions that haven’t been broadcasted yet.

The second and third lines ensure that the node’s mempool starts empty. The fourth and fifth lines create a transaction from node 0 to node 1. It is very similar to how it was done in the previous example, but this time, the test captures the transaction ID to check if it exists in the mempool.

The next lines confirm that node 0 (which created the transaction) contains the transaction in its mempool but not node 1 since it has not been propagated yet.

This is done by verifying that the mempool size of node 0 is 1 and also has one unbroadcasted transaction.

An interesting way to check if a mempool contains a specific transaction is through the RPC command getrawmempool(), which returns all transaction ids in the memory pool as an array. Then, check that the array contains the transaction being searched.

The line assert txid in self.nodes[0].getrawmempool() does this.

The command self.sync_mempools() waits until all nodes have the same transactions in their memory pools.

Afterward, with the mempools synchronized, all the tests are redone to ensure the mempool as node 1 has the same transactions of the mempool of node 0.

self.sync_blocks() has already been seen in the previous example, but what matters here is that the transaction must be removed from mempool after being included in a block.

The two last lines do this check.

This example showed some important functions that are commonly used in the mempool tests.

More mempool tests can be found at test/functional/mempool_*.py. Two good tests to start with are mempool_accept.py and mempool_spend_coinbase.py.

Note
Sync* Functions

The BitcoinTestFramework class has three syncing functions:

Test 3 - Adding P2PInterface Connections

The code below is simplified version of the test/functional/p2p_add_connections.py. It shows how to add a P2P connection and validate them.

#!/usr/bin/env python3
from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal


def check_node_connections(*, node, num_in, num_out):
    info = node.getnetworkinfo()
    assert_equal(info["connections_in"], num_in)
    assert_equal(info["connections_out"], num_out)


class P2PAddConnections(BitcoinTestFramework):
    def set_test_params(self):
        self.num_nodes = 2

    def setup_network(self):
        self.setup_nodes()

    def run_test(self):
        self.log.info("Add 8 outbounds to node 0")
        for i in range(8):
            self.log.info(f"outbound: {i}")
            self.nodes[0].add_outbound_p2p_connection(
                P2PInterface(), p2p_idx=i, connection_type="outbound-full-relay")

        self.log.info("Add 2 block-relay-only connections to node 0")
        for i in range(2):
            self.log.info(f"block-relay-only: {i}")
            self.nodes[0].add_outbound_p2p_connection(
                P2PInterface(), p2p_idx=i + 8, connection_type="block-relay-only")

        self.log.info("Add 5 inbound connections to node 1")
        for i in range(5):
            self.log.info(f"inbound: {i}")
            self.nodes[1].add_p2p_connection(P2PInterface())

        self.log.info("Check the connections opened as expected")
        check_node_connections(node=self.nodes[0], num_in=0, num_out=10)
        check_node_connections(node=self.nodes[1], num_in=5, num_out=0)

        self.log.info("Disconnect p2p connections")
        self.nodes[0].disconnect_p2ps()
        check_node_connections(node=self.nodes[0], num_in=0, num_out=0)


if __name__ == '__main__':
    P2PAddConnections().main()

The check_node_connections() method gets the result of getnetworkinfo() RPC command which retrieves network information, including the number of inbound connections (connections_in) and the number of outbound connections (connections_out).

It then verifies that the numbers returned by RPC command are the same as those passed as parameters, which are the number of the connections opened manually.

The test class overrides the setup_network() method. The default implementation connects all the nodes and this test manually adds the connections. setup_nodes() starts the chain and the wallet (if enabled) but not the network.

self.log.info() is a method used quite frequently in the tests. It comes from Python Logging package. It is used to describe the test and make clear the intention of the developer at each step. It should be used as much as necessary to ensure a good understanding of the test.

Both add_outbound_p2p_connection() and add_p2p_connection() (which adds an inbound connection) receive a P2PInterface object as a parameter.
If the connection is of the outbound type, there is one more parameter (connection_type) to define if the connection type is outbound-full-relay or block-relay-only.

To disconnect the nodes, the disconnect_p2ps() can be used.

Some tests require the P2PInterface connections handle one or more message types. It should be done by creating a subclass that overrides the message types methods to provide custom message handling behavior, as seen in the P2PInterface section.

A good example of this approach is test/functional/p2p_addrv2_relay.py.
The AddrReceiver is P2PInterface subclass and overrides on_addrv2() method to add custom handling for the addrv2 message type.

Note
Addr v2 Message Type

addr v2 is a new version of the addr message in the Bitcoin P2P network protocol, which is used to advertise the addresses of nodes that accept incoming connections. It was proposed in BIP 155. It adds support to v3 Tor hidden service addresses and other privacy-enhancing network protocols.

# ...
class AddrReceiver(P2PInterface):
    addrv2_received_and_checked = False

    def __init__(self):
        super().__init__(support_addrv2 = True)

    def on_addrv2(self, message):
        for addr in message.addrs:
            assert_equal(addr.nServices, 9)
            assert addr.ip.startswith('123.123.123.')
            assert (8333 <= addr.port < 8343)
        self.addrv2_received_and_checked = True

    def wait_for_addrv2(self):
        self.wait_until(lambda: "addrv2" in self.last_message)

# ...
class AddrTest(BitcoinTestFramework):
    # ...
    def run_test(self):
        # ...
        self.log.info(
            'Check that addrv2 message content is relayed and added to addrman')
        addr_receiver = self.nodes[0].add_p2p_connection(AddrReceiver())
        msg.addrs = ADDRS
        with self.nodes[0].assert_debug_log([
                'Added 10 addresses from 127.0.0.1: 0 tried',
                'received: addrv2 (131 bytes) peer=0',
                'sending addrv2 (131 bytes) peer=1',
        ]):
            addr_source.send_and_ping(msg)
            self.nodes[0].setmocktime(int(time.time()) + 30 * 60)
            addr_receiver.wait_for_addrv2()
        # ...

In the code above, the AddrReceiver class checks that every addr receive from addrv2 messages has the correct format. It is done in the function on_addrv2 that implements the addrv2 handling.

But there are more interesting details in this test.

assert_debug_log() is a function that checks whether new entries have been added to the debug.log file and whether these entries match the text passed as a parameter.

When multiple addresses are added, the message "Added %i addresses from …​" is recorded in the log.
When the node receives a message, the message type and its size are recorded in the log. The same applies when sending a message.

send_and_ping(msg) is a P2PInterface method that sends a specific message (msg) to the node. In that case, the P2P interface is sending an ADDRV2 message with 10 addresses to the node.

class P2PInterface(P2PConnection):
    # ...
    # Message sending helper functions

    def send_and_ping(self, message, timeout=60):
        self.send_message(message)
        self.sync_with_ping(timeout=timeout)

    # Sync up with the node
    def sync_with_ping(self, timeout=60):
        self.send_message(msg_ping(nonce=self.ping_counter))

        def test_function():
            return self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter

        self.wait_until(test_function, timeout=timeout)
        self.ping_counter += 1
    # ...

After sending the ADDRV2 message, the P2P interface calls sync_with_ping() to send a ping message to the node, and then waits to receive a pong before proceeding. The reason is to ensure the node processed the message.

Nodes always respond to ping with pong and nodes process their messages from a single peer in the order in which they were received. In other words: if the P2P interface has gotten the pong back, it is known for a fact that all previous messages have been processed.

Therefore, in that case, if the P2P interface receives pong, it means the previous message (ADDRV2) was received and processed.

setmocktime() is an RPC command for -regtest mode only and is widely used in functional testing. It sets the local time of the node to a timestamp. Sending addresses to peers is controlled by random delay timer (called m_next_addr_send) to improve privacy. Thus, the time of the node is advanced by half an hour to ensure that the timer is over and the sending of addresses is already allowed.

And finally, the wait_for_addrv2() method is basically a wrapper for self.wait_until(lambda: `addrv2 in self.last_message)`.

self.wait_until(…​) makes the test waits for an arbitrary predicate to evaluate to True. In the case of the above code, it will wait until the last message is addrv2.

But the test does not always need to implement its own predicate. There are already many wait_for_*() functions implemented. If the test needs to wait for a transaction, for example, it should use wait_for_tx(). There is no need to reinvent the wheel. Other examples of these functions are wait_for_block(), wait_for_merkleblock(), wait_for_header() and so on.

Note that most of these functions use wait_until(…​). Therefore, the developer should only use wait_until(…​) if there is no wait_for_*() function to the intended test.

class P2PInterface(P2PConnection):
    # ....
    # Message receiving helper methods

    def wait_for_tx(self, txid, timeout=60):
        def test_function():
            if not self.last_message.get('tx'):
                return False
            return self.last_message['tx'].tx.rehash() == txid

        self.wait_until(test_function, timeout=timeout)

    def wait_for_block(self, blockhash, timeout=60):
        def test_function():
            return self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash

        self.wait_until(test_function, timeout=timeout)
    # ...

Summary

Bitcoin Functional Test Framework has 3 main classes: BitcoinTestFramework, TestNode and P2PInterface.

The BitcoinTestFramework class is a base class for all functional tests. TestNode represents a bitcoind node for use in functional tests. P2PInterface allows a more customizable interaction with the node.

The set_test_params() and the run_test() methods should be overridden to implement the test and the self.num_nodes set the number of nodes that will be used in the test.

By default, every test loads a pre-mined chain of 200 blocks from the cache, but if self.setup_clean_chain is True, an empty chain will be loaded.

skip_test_if_missing_module() is used to skip the test if it requires certain modules to be present. The skip_if_no_*() methods should be called if the test uses an optional module for compiling bitcoind.

sync_blocks() waits for all nodes to have the same tip and sync_mempools() waits for all nodes to have the same transactions in their mempools. sync_all() does both.

Nodes can connect to P2PInterface using add_outbound_p2p_connection() and add_p2p_connection(). The test can create subclasses of P2PInterface to handle specific message types.

wait_for_*() and wait_until(…​) await the execution of expected behavior.

References