diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py index eb3723be42f8..df851a8a072f 100644 --- a/tests/topotests/lib/pim.py +++ b/tests/topotests/lib/pim.py @@ -1745,6 +1745,49 @@ def verify_pim_rp_info( return True +@retry(retry_timeout=60, diag_pct=0) +def verify_pim_rp_info_is_empty(tgen, dut, af="ipv4"): + """ + Verify pim rp info by running "show ip pim rp-info" cli + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + + Usage + ----- + dut = "r1" + result = verify_pim_rp_info_is_empty(tgen, dut) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + if dut not in tgen.routers(): + return False + + rnode = tgen.routers()[dut] + + ip_cmd = "ip" + if af == "ipv6": + ip_cmd = "ipv6" + + logger.info("[DUT: %s]: Verifying %s rp info", dut, ip_cmd) + cmd = "show {} pim rp-info json".format(ip_cmd) + show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True) + + if show_ip_rp_info_json: + errormsg = "[DUT %s]: Verifying empty rp-info [FAILED]!!" % (dut) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + @retry(retry_timeout=60, diag_pct=0) def verify_pim_state( tgen, @@ -2751,6 +2794,48 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N return True +def scapy_send_autorp_raw_packet(tgen, senderRouter, senderInterface, packet=None): + """ + Using scapy Raw() method to send AutoRP raw packet from one FRR + to other + + Parameters: + ----------- + * `tgen` : Topogen object + * `senderRouter` : Sender router + * `senderInterface` : SenderInterface + * `packet` : AutoRP packet in raw format + + returns: + -------- + errormsg or True + """ + + global CWD + result = "" + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + python3_path = tgen.net.get_exec_path(["python3", "python"]) + # send_bsr_packet.py has no direct ties to bsr, just sends a raw packet out + # a given interface, so just reuse it + script_path = os.path.join(CWD, "send_bsr_packet.py") + node = tgen.net[senderRouter] + + cmd = [ + python3_path, + script_path, + packet, + senderInterface, + "--interval=1", + "--count=1", + ] + logger.info("Scapy cmd: \n %s", cmd) + node.cmd_raises(cmd) + + logger.debug("Exiting lib API: scapy_send_autorp_raw_packet") + return True + + def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None): """ Find which RP is having lowest prioriy and returns rp IP @@ -4260,6 +4345,7 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses): logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True + @retry(retry_timeout=62) def verify_static_groups(tgen, dut, interface, group_addresses): """ @@ -4293,7 +4379,9 @@ def verify_static_groups(tgen, dut, interface, group_addresses): rnode = tgen.routers()[dut] logger.info("[DUT: %s]: Verifying static groups received:", dut) - show_static_group_json = run_frr_cmd(rnode, "show ip igmp static-group json", isjson=True) + show_static_group_json = run_frr_cmd( + rnode, "show ip igmp static-group json", isjson=True + ) if type(group_addresses) is not list: group_addresses = [group_addresses] @@ -4330,6 +4418,7 @@ def verify_static_groups(tgen, dut, interface, group_addresses): logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True + def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"): """ Verify ip pim interface traffic by running diff --git a/tests/topotests/pim_autorp/__init__.py b/tests/topotests/pim_autorp/__init__.py new file mode 100755 index 000000000000..e69de29bb2d1 diff --git a/tests/topotests/pim_autorp/r1/frr.conf b/tests/topotests/pim_autorp/r1/frr.conf new file mode 100644 index 000000000000..2fddbc3ae29e --- /dev/null +++ b/tests/topotests/pim_autorp/r1/frr.conf @@ -0,0 +1,16 @@ +! +hostname r1 +password zebra +log file /tmp/r1-frr.log +debug pim autorp +! +interface r1-eth0 + ip address 10.10.76.1/24 + ip igmp + ip pim +! +ip forwarding +! +router pim + autorp discovery +! \ No newline at end of file diff --git a/tests/topotests/pim_autorp/r2/frr.conf b/tests/topotests/pim_autorp/r2/frr.conf new file mode 100644 index 000000000000..fd3c0cad3990 --- /dev/null +++ b/tests/topotests/pim_autorp/r2/frr.conf @@ -0,0 +1,16 @@ +! +hostname r2 +password zebra +log file /tmp/r2-frr.log +debug pim autorp +! +interface r2-eth0 + ip address 10.10.76.2/24 + ip igmp + ip pim +! +ip forwarding +! +router pim + autorp discovery +! \ No newline at end of file diff --git a/tests/topotests/pim_autorp/test_pim_autorp.py b/tests/topotests/pim_autorp/test_pim_autorp.py new file mode 100644 index 000000000000..5aecce942e7b --- /dev/null +++ b/tests/topotests/pim_autorp/test_pim_autorp.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python +# SPDX-License-Identifier: ISC + +# +# test_pim_autorp.py +# +# Copyright (c) 2024 ATCorp +# Nathan Bahr +# + +import os +import sys +import pytest + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen +from lib.topolog import logger +from lib.pim import scapy_send_autorp_raw_packet, verify_pim_rp_info, verify_pim_rp_info_is_empty +from lib.common_config import step, write_test_header + +from time import sleep + +""" +test_pim_autorp.py: Test general PIM AutoRP functionality +""" + +TOPOLOGY = """ + Basic AutoRP functionality + + +---+---+ +---+---+ + | | 10.10.76.0/24 | | + + R1 + <------------------> + R2 | + | | .1 .2 | | + +---+---+ +---+---+ +""" + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# Required to instantiate the topology builder class. +pytestmark = [pytest.mark.pimd] + + +def build_topo(tgen): + "Build function" + + # Create routers + tgen.add_router("r1") + tgen.add_router("r2") + + # Create link between router 1 and 2 + switch = tgen.add_switch("s1-2") + switch.add_link(tgen.gears["r1"]) + switch.add_link(tgen.gears["r2"]) + +def setup_module(mod): + logger.info("PIM AutoRP basic functionality:\n {}".format(TOPOLOGY)) + + tgen = Topogen(build_topo, mod.__name__) + tgen.start_topology() + + # Router 1 will be the router configured with "fake" autorp configuration, so give it a default route + # to router 2 so that routing to the RP address is not an issue + # r1_defrt_setup_cmds = [ + # "ip route add default via 10.10.76.1 dev r1-eth0", + # ] + # for cmd in r1_defrt_setup_cmds: + # tgen.net["r1"].cmd(cmd) + + logger.info("Testing PIM AutoRP support") + router_list = tgen.routers() + for rname, router in router_list.items(): + logger.info("Loading router %s" % rname) + router.load_frr_config(os.path.join(CWD, "{}/frr.conf".format(rname))) + + # Initialize all routers. + tgen.start_router() + for router in router_list.values(): + if router.has_version("<", "4.0"): + tgen.set_error("unsupported version") + + +def teardown_module(mod): + "Teardown the pytest environment" + tgen = get_topogen() + tgen.stop_topology() + +def test_pim_autorp_discovery_single_rp(request): + "Test PIM AutoRP Discovery with single RP" + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + step("Start with no RP configuration") + result = verify_pim_rp_info_is_empty(tgen, "r1") + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Send AutoRP packet from r1 to r2") + # 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4 + data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000" + scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data) + + step("Verify rp-info from AutoRP packet") + result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Verify AutoRP configuration times out") + result = verify_pim_rp_info_is_empty(tgen, "r2") + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + +def test_pim_autorp_discovery_multiple_rp(request): + "Test PIM AutoRP Discovery with multiple RP's" + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + if tgen.routers_have_failure(): + pytest.skip("skipped because of router(s) failure") + + step("Start with no RP configuration") + result = verify_pim_rp_info_is_empty(tgen, "r2") + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Send AutoRP packet from r1 to r2") + # 2 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/8, 10.10.76.3, group(s) 225.0.0.0/8 + data = "01005e00012800127f55cfb1080045c0003c700c000008110ab20a0a4c01e000012801f001f000283f5712020005000000000a0a4c0103010008e00000000a0a4c0303010008e1000000" + scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data) + + step("Verify rp-info from AutoRP packet") + result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/8", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + result = verify_pim_rp_info(tgen, None, "r2", "225.0.0.0/8", "r2-eth0", "10.10.76.3", "AutoRP", False, "ipv4", True) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + +def test_pim_autorp_discovery_static(request): + "Test PIM AutoRP Discovery with Static RP" + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + if tgen.routers_have_failure(): + pytest.skip("skipped because of router(s) failure") + + step("Start with no RP configuration") + result = verify_pim_rp_info_is_empty(tgen, "r2") + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Add static RP configuration to r2") + rnode = tgen.routers()["r2"] + rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'rp 10.10.76.3 224.0.0.0/4'") + + step("Verify static rp-info from r2") + result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.3", "Static", False, "ipv4", True) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Send AutoRP packet from r1 to r2") + # 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4 + data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000" + scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data) + + step("Verify rp-info from AutoRP packet") + result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + +def test_pim_autorp_announce_group(request): + "Test PIM AutoRP Announcement with a single group" + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + if tgen.routers_have_failure(): + pytest.skip("skipped because of router(s) failure") + + step("Add candidate RP configuration to r1") + rnode = tgen.routers()["r1"] + rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce 10.10.76.1 224.0.0.0/4'") + step("Verify Announcement sent data") + # TODO: Verify AutoRP mapping agent receives candidate RP announcement + # Mapping agent is not yet implemented + #sleep(10) + step("Change AutoRP Announcement packet parameters") + rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce scope 8 interval 10 holdtime 60'") + step("Verify Announcement sent data") + # TODO: Verify AutoRP mapping agent receives updated candidate RP announcement + # Mapping agent is not yet implemented + #sleep(10) + + +def test_memory_leak(): + "Run the memory leak test and report results." + tgen = get_topogen() + if not tgen.is_memleak_enabled(): + pytest.skip("Memory leak test/report is disabled") + + tgen.report_memory_leaks() + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args))