Skip to content

Commit

Permalink
Updated tests: fixed bugs vrf all
Browse files Browse the repository at this point in the history
  • Loading branch information
vitthalmagadum committed Jan 22, 2025
1 parent 0e4e2b6 commit 9f92ab1
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 191 deletions.
2 changes: 1 addition & 1 deletion anta/input_models/routing/isis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module containing input models for routing ISIS tests."""
Expand Down
171 changes: 59 additions & 112 deletions anta/tests/routing/isis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,76 +18,6 @@
from anta.tools import get_value


def _get_isis_neighbor_details(isis_neighbor_json: dict[str, Any], neighbor_state: Literal["up", "down"] | None = None) -> list[dict[str, Any]]:
"""Return the list of isis neighbors.
Parameters
----------
isis_neighbor_json
The JSON output of the `show isis neighbors` command.
neighbor_state
Value of the neihbor state we are looking for. Defaults to `None`.
Returns
-------
list[dict[str, Any]]
A list of isis neighbors.
"""
return [
{
"vrf": vrf,
"instance": instance,
"neighbor": adjacency["hostname"],
"neighbor_address": adjacency["routerIdV4"],
"interface": adjacency["interfaceName"],
"state": adjacency["state"],
}
for vrf, vrf_data in isis_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data.get("isisInstances", {}).items()
for neighbor, neighbor_data in instance_data.get("neighbors", {}).items()
for adjacency in neighbor_data.get("adjacencies", [])
if neighbor_state is None or adjacency["state"] == neighbor_state
]


def _get_isis_interface_details(isis_neighbor_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Return the isis interface details.
Parameters
----------
isis_neighbor_json
The JSON output of the `show isis interface brief` command.
Returns
-------
dict[str, Any]]
A dict of isis interfaces.
"""
return [
{"vrf": vrf, "interface": interface, "mode": mode, "count": int(level_data["numAdjacencies"]), "level": int(level)}
for vrf, vrf_data in isis_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data.get("isisInstances").items()
for interface, interface_data in instance_data.get("interfaces").items()
for level, level_data in interface_data.get("intfLevels").items()
if (mode := level_data["passive"]) is not True
]


def _get_interface_data(interface: str, vrf: str, command_output: dict[str, Any]) -> dict[str, Any] | None:
"""Extract data related to an IS-IS interface for testing."""
if (vrf_data := get_value(command_output, f"vrfs.{vrf}")) is None:
return None

for instance_data in vrf_data.get("isisInstances").values():
if (intf_dict := get_value(dictionary=instance_data, key="interfaces")) is not None:
try:
return next(ifl_data for ifl, ifl_data in intf_dict.items() if ifl == interface)
except StopIteration:
return None
return None


def _get_adjacency_segment_data_by_neighbor(neighbor: str, instance: str, vrf: str, command_output: dict[str, Any]) -> dict[str, Any] | None:
"""Extract data related to an IS-IS interface for testing."""
search_path = f"vrfs.{vrf}.isisInstances.{instance}.adjacencySegments"
Expand All @@ -103,12 +33,12 @@ def _get_adjacency_segment_data_by_neighbor(neighbor: str, instance: str, vrf: s


class VerifyISISNeighborState(AntaTest):
"""Verifies all IS-IS neighbors are in UP state.
"""Verifies the health of all the IS-IS neighbors.
Expected Results
----------------
* Success: The test will pass if all IS-IS neighbors are in UP state.
* Failure: The test will fail if any IS-IS neighbor adjance session is down.
* Failure: The test will fail if any IS-IS neighbor adjacency is down.
* Skipped: The test will be skipped if no IS-IS neighbor is found.
Examples
Expand All @@ -117,28 +47,40 @@ class VerifyISISNeighborState(AntaTest):
anta.tests.routing:
isis:
- VerifyISISNeighborState:
check_all_vrfs: bool = False
```
"""

categories: ClassVar[list[str]] = ["isis"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis neighbors", revision=1)]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis neighbors vrf all", revision=1)]

class Input(AntaTest.Input):
"""Input model for the VerifyISISNeighborCount test."""

check_all_vrfs: bool = False
"""If enabled it verifies the all ISIS instances in all the configured vrfs. Defaults to `False` and verified the `default` vrf only."""

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISNeighborState."""
self.result.is_success()

# Verify the ISIS neighbor configure. If not then skip the test.
command_output = self.instance_commands[0].json_output
neighbor_details = _get_isis_neighbor_details(command_output)
if not neighbor_details:
self.result.is_skipped("No IS-IS neighbor detected")
# Verify if ISIS neighbors are configured. Skip the test if none are found.
if not (command_output := self.instance_commands[0].json_output["vrfs"]):
self.result.is_skipped("IS-IS is not configured on device")
return

# Verify that no neighbor has a session in the down state
not_full_neighbors = _get_isis_neighbor_details(command_output, neighbor_state="down")
for neighbor in not_full_neighbors:
self.result.is_failure(f"Instance: {neighbor['instance']} VRF: {neighbor['vrf']} Neighbor: {neighbor['neighbor']} - Session (adjacency) down")
vrfs_to_check = command_output
if not self.inputs.check_all_vrfs:
vrfs_to_check = {"default": command_output["default"]}

for vrf, vrf_data in vrfs_to_check.items():
for isis_instance, instace_data in vrf_data["isisInstances"].items():
neighbors = instace_data.get("neighbors", {})
for neighbor in neighbors.values():
for adjacencies in neighbor["adjacencies"]:
if adjacencies["state"] != "up":
self.result.is_failure(f"Instance: {isis_instance} VRF: {vrf} Interface: {adjacencies['interfaceName']} - Session (adjacency) down")


class VerifyISISNeighborCount(AntaTest):
Expand All @@ -149,6 +91,9 @@ class VerifyISISNeighborCount(AntaTest):
1. Validates the IS-IS neighbors configured on specified interface.
2. Validates the number of IS-IS neighbors for each interface at specified level.
!! Warning
Test supports the `default` vrf only.
Expected Results
----------------
* Success: If all of the following occur:
Expand Down Expand Up @@ -185,7 +130,7 @@ class Input(AntaTest.Input):
"""Input model for the VerifyISISNeighborCount test."""

interfaces: list[ISISInterface]
"""list of interfaces with their information."""
"""List of IS-IS interfaces with their information."""
InterfaceCount: ClassVar[type[ISISInterface]] = ISISInterface

@AntaTest.anta_test
Expand All @@ -194,18 +139,24 @@ def test(self) -> None:
self.result.is_success()

command_output = self.instance_commands[0].json_output
isis_neighbor_count = _get_isis_interface_details(command_output)
if len(isis_neighbor_count) == 0:
self.result.is_skipped("No IS-IS neighbor detected")

# Verify if ISIS neighbors are configured. Skip the test if none are found.
if not (instance_detail := get_value(command_output, "vrfs..default..isisInstances", separator="..")):
self.result.is_skipped("IS-IS is not configured on device")
return

for interface in self.inputs.interfaces:
eos_data = [ifl_data for ifl_data in isis_neighbor_count if ifl_data["interface"] == interface.name and ifl_data["level"] == interface.level]
if not eos_data:
interface_detail = {}
for instance_data in instance_detail.values():
if interface_data := get_value(instance_data, f"interfaces..{interface.name}..intfLevels..{interface.level}", separator=".."):
interface_detail = interface_data
break

if not interface_detail:
self.result.is_failure(f"{interface} - Not configured")
continue

if (act_count := eos_data[0]["count"]) != interface.count:
if (act_count := interface_detail.get("numAdjacencies", 0)) != interface.count:
self.result.is_failure(f"{interface} - Neighbor count mismatch - Expected: {interface.count} Actual: {act_count}")


Expand Down Expand Up @@ -244,9 +195,8 @@ class VerifyISISInterfaceMode(AntaTest):
```
"""

description = "Verifies interface mode for IS-IS"
categories: ClassVar[list[str]] = ["isis"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis interface brief", revision=1)]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis interface brief vrf all", revision=1)]

class Input(AntaTest.Input):
"""Input model for the VerifyISISNeighborCount test."""
Expand All @@ -261,30 +211,30 @@ def test(self) -> None:
self.result.is_success()

command_output = self.instance_commands[0].json_output

# Verify if ISIS neighbors are configured. Skip the test if none are found.
if len(command_output["vrfs"]) == 0:
self.result.is_skipped("No IS-IS neighbor detected")
return

# Check for p2p interfaces
for interface in self.inputs.interfaces:
interface_data = _get_interface_data(
interface=interface.name,
vrf=interface.vrf,
command_output=command_output,
)
# Check for correct VRF
if interface_data is not None:
interface_type = get_value(dictionary=interface_data, key="interfaceType", default="unset")
# Check for interfaceType
if interface.mode == "point-to-point" and interface.mode != interface_type:
self.result.is_failure(f"{interface} - Incorrect circuit type - Expected: point-to-point Actual: {interface_type}")
# Check for passive
elif interface.mode == "passive":
json_path = f"intfLevels.{interface.level}.passive"
if interface_data is None or get_value(dictionary=interface_data, key=json_path, default=False) is False:
self.result.is_failure(f"{interface} - Not running in passive mode")
else:
interface_detail = {}
instance_detail = get_value(command_output, f"vrfs..{interface.vrf}..isisInstances", separator="..", default={})
for instance_data in instance_detail.values():
if interface_data := get_value(instance_data, f"interfaces..{interface.name}", separator=".."):
interface_detail = interface_data
break

if not interface_detail:
self.result.is_failure(f"{interface} - Not configured")
continue

# Check for passive
if interface.mode == "passive" and get_value(interface_detail, f"intfLevels.{interface.level}.passive", default=False) is False:
self.result.is_failure(f"{interface} - Not running in passive mode")

elif interface.mode != (interface_type := get_value(interface_detail, "interfaceType", default="unset")):
self.result.is_failure(f"{interface} - Incorrect circuit type - Expected: {interface.mode} Actual: {interface_type}")


class VerifyISISSegmentRoutingAdjacencySegments(AntaTest):
Expand Down Expand Up @@ -363,9 +313,6 @@ def test(self) -> None:
f"{instance} {input_segment} - Incorrect Segment Identifier origin - Expected: {input_segment.sid_origin} Actual: {act_origin}"
)

if (endpoint := eos_segment["ipAddress"]) != str(input_segment.address):
self.result.is_failure(f"{instance} {input_segment} - Incorrect Segment endpoint - Expected: {input_segment.address} Actual: {endpoint}")

if (actual_level := eos_segment["level"]) != input_segment.level:
self.result.is_failure(f"{instance} {input_segment} - Incorrect IS-IS level - Expected: {input_segment.level} Actual: {actual_level}")

Expand Down
5 changes: 3 additions & 2 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ anta.tests.routing.generic:
maximum: 20
anta.tests.routing.isis:
- VerifyISISInterfaceMode:
# Verifies interface mode for IS-IS
# Verifies the operational mode of IS-IS Interfaces.
interfaces:
- name: Loopback0
mode: passive
Expand All @@ -612,7 +612,8 @@ anta.tests.routing.isis:
count: 2
# level is set to 2 by default
- VerifyISISNeighborState:
# Verifies all IS-IS neighbors are in UP state.
# Verifies the health of all the IS-IS neighbors.
check_all_vrfs: bool = False
- VerifyISISSegmentRoutingAdjacencySegments:
# Verifies the ISIS SR Adjacency Segments.
instances:
Expand Down
76 changes: 0 additions & 76 deletions tests/units/anta_tests/routing/test_isis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
VerifyISISSegmentRoutingAdjacencySegments,
VerifyISISSegmentRoutingDataplane,
VerifyISISSegmentRoutingTunnels,
_get_interface_data,
)
from tests.units.anta_tests import test

Expand Down Expand Up @@ -1835,78 +1834,3 @@
},
},
]


COMMAND_OUTPUT = {
"vrfs": {
"default": {
"isisInstances": {
"CORE-ISIS": {
"interfaces": {
"Loopback0": {
"enabled": True,
"intfLevels": {
"2": {
"ipv4Metric": 10,
"sharedSecretProfile": "",
"isisAdjacencies": [],
"passive": True,
"v4Protection": "disabled",
"v6Protection": "disabled",
}
},
"areaProxyBoundary": False,
},
"Ethernet1": {
"intfLevels": {
"2": {
"ipv4Metric": 10,
"numAdjacencies": 1,
"linkId": "84",
"sharedSecretProfile": "",
"isisAdjacencies": [],
"passive": False,
"v4Protection": "link",
"v6Protection": "disabled",
}
},
"interfaceSpeed": 1000,
"areaProxyBoundary": False,
},
}
}
}
},
"EMPTY": {"isisInstances": {}},
"NO_INTERFACES": {"isisInstances": {"CORE-ISIS": {}}},
}
}
EXPECTED_LOOPBACK_0_OUTPUT = {
"enabled": True,
"intfLevels": {
"2": {
"ipv4Metric": 10,
"sharedSecretProfile": "",
"isisAdjacencies": [],
"passive": True,
"v4Protection": "disabled",
"v6Protection": "disabled",
}
},
"areaProxyBoundary": False,
}


@pytest.mark.parametrize(
("interface", "vrf", "expected_value"),
[
pytest.param("Loopback0", "WRONG_VRF", None, id="VRF_not_found"),
pytest.param("Loopback0", "EMPTY", None, id="VRF_no_ISIS_instances"),
pytest.param("Loopback0", "NO_INTERFACES", None, id="ISIS_instance_no_interfaces"),
pytest.param("Loopback42", "default", None, id="interface_not_found"),
pytest.param("Loopback0", "default", EXPECTED_LOOPBACK_0_OUTPUT, id="interface_found"),
],
)
def test__get_interface_data(interface: str, vrf: str, expected_value: dict[str, Any] | None) -> None:
"""Test anta.tests.routing.isis._get_interface_data."""
assert _get_interface_data(interface, vrf, COMMAND_OUTPUT) == expected_value

0 comments on commit 9f92ab1

Please sign in to comment.