Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta): Added testcase to verify the BGP Redistributed Routes #993

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""


# Regular expression for BGP redistributed routes
REGEX_IPV4_UNICAST = r"\bipv4[\s]?unicast\b"
REGEX_IPV4_MULTICAST = r"\bipv4[\s]?multicast\b"
REGEX_IPV6_UNICAST = r"\bipv6[\s]?unicast\b"
REGEX_IPV6_MULTICAST = r"\bipv6[\s]?multicast\b"


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
built_in_methods = ["local", "none", "logging"]
Expand Down Expand Up @@ -132,6 +139,40 @@ def validate_regex(value: str) -> str:
return value


def bgp_redistributed_route_proto_abbreviations(value: str) -> str:
"""Abbreviations for different BGP redistributed route protocols.

Examples
--------
- IPv4 Unicast
- ipv4Unicast
- IPv4 Multicast
- ipv4Multicast

"""
patterns = {REGEX_IPV4_UNICAST: "v4u", REGEX_IPV4_MULTICAST: "v4m", REGEX_IPV6_UNICAST: "v6u", REGEX_IPV6_MULTICAST: "v6m"}

for pattern, replacement in patterns.items():
match = re.search(pattern, value, re.IGNORECASE)
if match:
return replacement

return value
Comment on lines +142 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse stuff from #1021 here?

If not, let's try to follow the same structure as in #1021 (updating the docstring, etc.). Thanks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, Carl, In this test the mapping is ipv4unicast: v4u hence we can not use the changes of #1021
hence updated structure .



def update_bgp_redistributed_proto_user(value: str) -> str:
"""Update BGP redistributed route `User` proto with EOS SDK.

Examples
--------
- User
Comment on lines +166 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the example is incomplete here.

"""
if value == "User":
value = "EOS SDK"

return value


# AntaTest.Input types
AAAAuthMethod = Annotated[str, AfterValidator(aaa_group_prefix)]
Vlan = Annotated[int, Field(ge=0, le=4094)]
Expand Down Expand Up @@ -319,3 +360,23 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str:
"inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs"
]
SnmpVersionV3AuthType = Annotated[Literal["auth", "priv", "noauth"], AfterValidator(snmp_v3_prefix)]
RedistributedProtocol = Annotated[
Literal[
"AttachedHost",
"Bgp",
"Connected",
"Dynamic",
"IS-IS",
"OSPF Internal",
"OSPF External",
"OSPF Nssa-External",
"OSPFv3 Internal",
"OSPFv3 External",
"OSPFv3 Nssa-External",
"RIP",
"Static",
"User",
],
AfterValidator(update_bgp_redistributed_proto_user),
]
RedisrbutedAfiSafi = Annotated[str, BeforeValidator(bgp_redistributed_route_proto_abbreviations)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RedisrbutedAfiSafi = Annotated[str, BeforeValidator(bgp_redistributed_route_proto_abbreviations)]
RedistributedAfiSafi = Annotated[str, BeforeValidator(bgp_redistributed_route_proto_abbreviations)]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a Literal here instead to make sure Pydantic catches if it's not v4u, v4m, etc. You can refer to #1021.

The idea is to modify the user input (if we can) before the validation.

88 changes: 85 additions & 3 deletions anta/input_models/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import BaseModel, ConfigDict, Field, PositiveInt, model_validator
from pydantic_extra_types.mac_address import MacAddress

from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, Safi, Vni
from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, RedisrbutedAfiSafi, RedistributedProtocol, Safi, Vni
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, RedisrbutedAfiSafi, RedistributedProtocol, Safi, Vni
from anta.custom_types import Afi, BgpDropStats, BgpUpdateError, MultiProtocolCaps, RedistributedAfiSafi, RedistributedProtocol, Safi, Vni


if TYPE_CHECKING:
import sys
Expand Down Expand Up @@ -68,8 +68,10 @@ class BgpAddressFamily(BaseModel):
check_peer_state: bool = False
"""Flag to check if the peers are established with negotiated AFI/SAFI. Defaults to `False`.

Can be enabled in the `VerifyBGPPeerCount` tests.
"""
Can be enabled in the `VerifyBGPPeerCount` tests."""

route_map: str | None = None
"""Specify redistributed route protocol route map. Required field in the `VerifyBGPRedistribution` test."""
Comment on lines +71 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's needed right?


@model_validator(mode="after")
def validate_inputs(self) -> Self:
Expand Down Expand Up @@ -256,3 +258,83 @@ def __str__(self) -> str:
- Next-hop: 192.168.66.101 Origin: Igp
"""
return f"Next-hop: {self.nexthop} Origin: {self.origin}"


class BgpVrf(BaseModel):
"""Model representing BGP vrfs."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Model representing BGP vrfs."""
"""Model representing a VRF in a BGP instance."""


vrf: str = "default"
"""VRF for the BGP instance. Defaults to `default`."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""VRF for the BGP instance. Defaults to `default`."""
"""VRF context."""

address_families: list[AddressFamilyConfig]
"""list of address family configuration."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""list of address family configuration."""
"""List of address family configuration."""


def __str__(self) -> str:
"""Return a human-readable string representation of the BgpVrf for reporting.

Examples
--------
- VRF: default
"""
return f"VRF: {self.vrf}"


class RedistributedRoute(BaseModel):
"""Model representing BGP redistributed route."""
Comment on lines +281 to +282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class RedistributedRoute(BaseModel):
"""Model representing BGP redistributed route."""
class RedistributedRouteConfig(BaseModel):
"""Model representing a BGP redistributed route configuration."""


proto: RedistributedProtocol
"""The redistributed route protocol."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""The redistributed route protocol."""
"""The redistributed protocol."""

include_leaked: bool | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On EOS, this value is either True or it doesn't exist. If it doesn't exist it means it's False. If a user provides False here, how will the test logic react?

"""Flag to include leaked BGP routes in the advertisement"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Flag to include leaked BGP routes in the advertisement"""
"""Flag to include leaked routes of the redistributed protocol while redistributing."""

route_map: str | None = None
"""The route map of the redistributed routes."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""The route map of the redistributed routes."""
"""Optional route map applied to the redistribution."""


@model_validator(mode="after")
def validate_include_leaked_support(self) -> Self:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def validate_include_leaked_support(self) -> Self:
def validate_inputs(self) -> Self:

"""Validate the input provided for included leaked field, included _leaked this field is not supported for proto AttachedHost, User, Dynamic, RIP."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Validate the input provided for included leaked field, included _leaked this field is not supported for proto AttachedHost, User, Dynamic, RIP."""
"""Validate that 'include_leaked' is not set when the redistributed protocol is AttachedHost, User, Dynamic, or RIP."""

if self.include_leaked and self.proto in ["AttachedHost", "EOS SDK", "Dynamic", "RIP"]:
msg = f"{self.include_leaked}, field is not supported for redistributed route protocol `{self.proto}`"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
msg = f"{self.include_leaked}, field is not supported for redistributed route protocol `{self.proto}`"
msg = f"'include_leaked' field is not supported for redistributed protocol '{self.proto}'"

raise ValueError(msg)
return self

def __str__(self) -> str:
"""Return a human-readable string representation of the RedistributedRoute for reporting.

Examples
--------
- Proto: Connected, Included Leaked: False, Route Map: RM-CONN-2-BGP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

includedLeaked will never be false on EOS, it will just be absent.

"""
base_string = f"Proto: {self.proto}"
if self.include_leaked is not None:
base_string += f", Included Leaked: {self.include_leaked}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
base_string += f", Included Leaked: {self.include_leaked}"
base_string += f", Include Leaked: {self.include_leaked}"

if self.route_map is not None:
base_string += f", Route Map: {self.route_map}"
return base_string


class AddressFamilyConfig(BaseModel):
"""Model representing BGP address family configs."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Model representing BGP address family configs."""
"""Model representing a BGP address family configuration."""


afi_safi: RedisrbutedAfiSafi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
afi_safi: RedisrbutedAfiSafi
afi_safi: RedistributedAfiSafi

"""BGP redistributed route supported address families"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""BGP redistributed route supported address families"""
"""AFI/SAFI abbreviation per EOS."""

redistributed_routes: list[RedistributedRoute]
"""A list of redistributed route"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""A list of redistributed route"""
"""List of redistributed route configuration."""


@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the AddressFamilyConfig class.

address families must be `ipv4` or `ipv6` only, and sub address families can be `unicast` or `multicast`.
"""
if self.afi_safi not in ["v4u", "v4m", "v6u", "v6m"]:
msg = f"Redistributed route protocol is not supported for address family `{self.afi_safi}`"
raise ValueError(msg)
return self
Comment on lines +322 to +331
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this if we use a Literal with BeforeValidator as explained in my other comment.


def __str__(self) -> str:
"""Return a human-readable string representation of the AddressFamilyConfig for reporting.

Examples
--------
- AFI-SAFI: v4u
"""
return f"AFI-SAFI: {self.afi_safi}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create a mapping to have cleaner failure messages to have something like AFI-SAFI: IPv4 Unicast instead of v4u. Thoughts?

91 changes: 90 additions & 1 deletion anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

from pydantic import field_validator

from anta.input_models.routing.bgp import BgpAddressFamily, BgpAfi, BgpNeighbor, BgpPeer, BgpRoute, VxlanEndpoint
from anta.input_models.routing.bgp import BgpAddressFamily, BgpAfi, BgpNeighbor, BgpPeer, BgpRoute, BgpVrf, VxlanEndpoint
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import format_data, get_item, get_value

# Using a TypeVar for the BgpPeer model since mypy thinks it's a ClassVar and not a valid type when used in field validators
T = TypeVar("T", bound=BgpPeer)

# pylint: disable=C0302
# TODO: Refactor to reduce the number of lines in this module later


def _check_bgp_neighbor_capability(capability_status: dict[str, bool]) -> bool:
"""Check if a BGP neighbor capability is advertised, received, and enabled.
Expand Down Expand Up @@ -1797,3 +1800,89 @@ def test(self) -> None:
# Verify BGP and RIB nexthops are same.
if len(bgp_nexthops) != len(route_entry["vias"]):
self.result.is_failure(f"{route} - Nexthops count mismatch - BGP: {len(bgp_nexthops)}, RIB: {len(route_entry['vias'])}")


class VerifyBGPRedistribution(AntaTest):
"""Verifies BGP redistributed routes protocol and route-map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Verifies BGP redistributed routes protocol and route-map.
"""Verifies BGP redistribution.


This test performs the following checks for each specified route:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This test performs the following checks for each specified route:
This test performs the following checks for each specified VRF in the BGP instance:


1. Ensures that the expected address-family is configured on the device.
2. Confirms that the redistributed route protocol, included leaked and route map match the expected values for a route.

Note: For "User" proto field, checking that it's "EOS SDK" versus User.

Expected Results
----------------
* Success: If all of the following conditions are met:
- The expected address-family is configured on the device.
- The redistributed route protocol, included leaked and route map align with the expected values for the route.
* Failure: If any of the following occur:
- The expected address-family is not configured on device.
- The redistributed route protocolor, included leaked or route map does not match the expected value for a route.

Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPRedistribution:
vrfs:
- vrf: default
address_families:
- afi_safi: ipv4Unicast
redistributed_routes:
- proto: Connected
include_leaked: True
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
- afi_safi: IPv6 Unicast
redistributed_routes:
- proto: Dynamic
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
```
"""

categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bgp instance vrf all", revision=4)]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPRedistribution test."""

vrfs: list[BgpVrf]
"""List of address families."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""List of address families."""
"""List of VRFs in the BGP instance."""


def _validate_redistribute_route_details(self, vrf_data: str, address_family: dict[str, Any], afi_safi_configs: list[dict[str, Any]]) -> None:
"""Validate the redstributed route details for a given address family."""
for route_info in address_family.redistributed_routes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this loop out of this helper method? I think it would be cleaner to have this method just check if a redistributed route config is valid or not. If it's not valid, you could maybe return the issues and call self.result.is_failure in the test method.

Goal is to separate validation logic from test result handling. Right now the helper method does validation AND sets failure status so we are mixing concerns.

# If the redistributed route protocol does not match the expected value, test fails.
if not (actual_route := get_item(afi_safi_configs.get("redistributedRoutes"), "proto", route_info.proto)):
self.result.is_failure(f"{vrf_data}, {address_family}, {route_info} - Not configured")
continue

# If includes leaked field applicable, and it does not matches the expected value, test fails.
if all([route_info.include_leaked is not None, (act_include_leaked := actual_route.get("includeLeaked", "Not Found")) != route_info.include_leaked]):
self.result.is_failure(f"{vrf_data}, {address_family}, {route_info} - Value for included leaked mismatch - Actual: {act_include_leaked}")

# If route map is required and it is not matching the expected value, test fails.
if all([route_info.route_map, (act_route_map := actual_route.get("routeMap", "Not Found")) != route_info.route_map]):
self.result.is_failure(f"{vrf_data}, {address_family}, {route_info} - Route map mismatch - Actual: {act_route_map}")

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPRedistribution."""
self.result.is_success()
command_output = self.instance_commands[0].json_output

for vrf_data in self.inputs.vrfs:
for address_family in vrf_data.address_families:
# If the specified VRF, AFI-SAFI details are not found, test fails.
if not (afi_safi_configs := get_value(command_output, f"vrfs.{vrf_data.vrf}.afiSafiConfig.{address_family.afi_safi}")):
self.result.is_failure(f"{vrf_data}, {address_family} - Not configured")
continue
Comment on lines +1885 to +1887
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have 2 checks, 1 for the VRF (VRF not configured - like the other tests) and the other for the AFI/SAFI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to update the docstring to reflect this :)

self._validate_redistribute_route_details(str(vrf_data), address_family, afi_safi_configs)
20 changes: 20 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,26 @@ anta.tests.routing.bgp:
- VerifyBGPPeersHealthRibd:
# Verifies the health of all the BGP IPv4 peer(s).
check_tcp_queues: True
- VerifyBGPRedistribution:
# Verifies BGP redistributed routes protocol and route-map.
vrfs:
- vrf: default
address_families:
- afi_safi: ipv4Unicast
redistributed_routes:
- proto: Connected
include_leaked: True
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
- afi_safi: IPv6 Unicast
redistributed_routes:
- proto: Dynamic
route_map: RM-CONN-2-BGP
- proto: Static
include_leaked: True
route_map: RM-CONN-2-BGP
- VerifyBGPRouteECMP:
# Verifies BGP IPv4 route ECMP paths.
route_entries:
Expand Down
Loading
Loading