diff --git a/homeassistant/components/dhcp/__init__.py b/homeassistant/components/dhcp/__init__.py index b9a330510c14df..adbfa70cfc7063 100644 --- a/homeassistant/components/dhcp/__init__.py +++ b/homeassistant/components/dhcp/__init__.py @@ -11,6 +11,7 @@ from scapy.config import conf from scapy.error import Scapy_Exception from scapy.layers.dhcp import DHCP +from scapy.layers.inet import IP from scapy.layers.l2 import Ether from scapy.sendrecv import AsyncSniffer @@ -31,7 +32,7 @@ from homeassistant.helpers.device_registry import format_mac from homeassistant.helpers.event import async_track_state_added_domain from homeassistant.loader import async_get_dhcp -from homeassistant.util.network import is_link_local +from homeassistant.util.network import is_invalid, is_link_local, is_loopback from .const import DOMAIN @@ -82,8 +83,14 @@ def __init__(self, hass, address_data, integration_matchers): def process_client(self, ip_address, hostname, mac_address): """Process a client.""" - if is_link_local(make_ip_address(ip_address)): - # Ignore self assigned addresses + made_ip_address = make_ip_address(ip_address) + + if ( + is_link_local(made_ip_address) + or is_loopback(made_ip_address) + or is_invalid(made_ip_address) + ): + # Ignore self assigned addresses, loopback, invalid return data = self._address_data.get(ip_address) @@ -255,7 +262,7 @@ def handle_dhcp_packet(self, packet): # DHCP request return - ip_address = _decode_dhcp_option(options, REQUESTED_ADDR) + ip_address = _decode_dhcp_option(options, REQUESTED_ADDR) or packet[IP].src hostname = _decode_dhcp_option(options, HOSTNAME) mac_address = _format_mac(packet[Ether].src) diff --git a/homeassistant/util/network.py b/homeassistant/util/network.py index c36e7f3793ad74..e714b6b6b31527 100644 --- a/homeassistant/util/network.py +++ b/homeassistant/util/network.py @@ -44,6 +44,11 @@ def is_local(address: IPv4Address | IPv6Address) -> bool: return is_loopback(address) or is_private(address) +def is_invalid(address: IPv4Address | IPv6Address) -> bool: + """Check if an address is invalid.""" + return bool(address == ip_address("0.0.0.0")) + + def is_ip_address(address: str) -> bool: """Check if a given string is an IP address.""" try: diff --git a/tests/components/dhcp/test_init.py b/tests/components/dhcp/test_init.py index de6743719c6383..001f977b8bb75b 100644 --- a/tests/components/dhcp/test_init.py +++ b/tests/components/dhcp/test_init.py @@ -48,6 +48,36 @@ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" ) +# iRobot-AE9EC12DD3B04885BCBFA36AFB01E1CC 50:14:79:03:85:2c 192.168.1.120 +RAW_DHCP_RENEWAL = ( + b"\x00\x15\x5d\x8e\xed\x02\x50\x14\x79\x03\x85\x2c\x08\x00\x45\x00" + b"\x01\x8e\x51\xd2\x40\x00\x40\x11\x63\xa1\xc0\xa8\x01\x78\xc0\xa8" + b"\x01\x23\x00\x44\x00\x43\x01\x7a\x12\x09\x01\x01\x06\x00\xd4\xea" + b"\xb2\xfd\xff\xff\x00\x00\xc0\xa8\x01\x78\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x50\x14\x79\x03\x85\x2c\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x63\x82\x53\x63\x35\x01\x03\x39\x02\x05" + b"\xdc\x3c\x45\x64\x68\x63\x70\x63\x64\x2d\x35\x2e\x32\x2e\x31\x30" + b"\x3a\x4c\x69\x6e\x75\x78\x2d\x33\x2e\x31\x38\x2e\x37\x31\x3a\x61" + b"\x72\x6d\x76\x37\x6c\x3a\x51\x75\x61\x6c\x63\x6f\x6d\x6d\x20\x54" + b"\x65\x63\x68\x6e\x6f\x6c\x6f\x67\x69\x65\x73\x2c\x20\x49\x6e\x63" + b"\x20\x41\x50\x51\x38\x30\x30\x39\x0c\x27\x69\x52\x6f\x62\x6f\x74" + b"\x2d\x41\x45\x39\x45\x43\x31\x32\x44\x44\x33\x42\x30\x34\x38\x38" + b"\x35\x42\x43\x42\x46\x41\x33\x36\x41\x46\x42\x30\x31\x45\x31\x43" + b"\x43\x37\x08\x01\x21\x03\x06\x1c\x33\x3a\x3b\xff" +) + async def test_dhcp_match_hostname_and_macaddress(hass): """Test matching based on hostname and macaddress.""" @@ -76,6 +106,31 @@ async def test_dhcp_match_hostname_and_macaddress(hass): } +async def test_dhcp_renewal_match_hostname_and_macaddress(hass): + """Test renewal matching based on hostname and macaddress.""" + dhcp_watcher = dhcp.DHCPWatcher( + hass, + {}, + [{"domain": "mock-domain", "hostname": "irobot-*", "macaddress": "501479*"}], + ) + + packet = Ether(RAW_DHCP_RENEWAL) + + with patch.object(hass.config_entries.flow, "async_init") as mock_init: + dhcp_watcher.handle_dhcp_packet(packet) + # Ensure no change is ignored + dhcp_watcher.handle_dhcp_packet(packet) + + assert len(mock_init.mock_calls) == 1 + assert mock_init.mock_calls[0][1][0] == "mock-domain" + assert mock_init.mock_calls[0][2]["context"] == {"source": "dhcp"} + assert mock_init.mock_calls[0][2]["data"] == { + dhcp.IP_ADDRESS: "192.168.1.120", + dhcp.HOSTNAME: "irobot-ae9ec12dd3b04885bcbfa36afb01e1cc", + dhcp.MAC_ADDRESS: "50147903852c", + } + + async def test_dhcp_match_hostname(hass): """Test matching based on hostname only.""" dhcp_watcher = dhcp.DHCPWatcher( diff --git a/tests/util/test_network.py b/tests/util/test_network.py index 2cd710e1d6c467..089ef5e0ab8e87 100644 --- a/tests/util/test_network.py +++ b/tests/util/test_network.py @@ -33,6 +33,12 @@ def test_is_link_local(): assert not network_util.is_link_local(ip_address("127.0.0.1")) +def test_is_invalid(): + """Test invalid address.""" + assert network_util.is_invalid(ip_address("0.0.0.0")) + assert not network_util.is_invalid(ip_address("127.0.0.1")) + + def test_is_local(): """Test local addresses.""" assert network_util.is_local(ip_address("192.168.0.1"))