diff --git a/homeassistant/components/ssdp/__init__.py b/homeassistant/components/ssdp/__init__.py index a84b2e690da64..149064f40d3b3 100644 --- a/homeassistant/components/ssdp/__init__.py +++ b/homeassistant/components/ssdp/__init__.py @@ -2,7 +2,7 @@ from __future__ import annotations import asyncio -from collections.abc import Awaitable +from collections.abc import Awaitable, MutableMapping from datetime import timedelta from enum import Enum from ipaddress import IPv4Address, IPv6Address @@ -33,15 +33,16 @@ # Attributes for accessing info from SSDP response ATTR_SSDP_LOCATION: Final = "ssdp_location" -ATTR_SSDP_ST = "ssdp_st" -ATTR_SSDP_NT = "ssdp_nt" -ATTR_SSDP_UDN = "ssdp_udn" -ATTR_SSDP_USN = "ssdp_usn" -ATTR_SSDP_EXT = "ssdp_ext" -ATTR_SSDP_SERVER = "ssdp_server" +ATTR_SSDP_ST: Final = "ssdp_st" +ATTR_SSDP_NT: Final = "ssdp_nt" +ATTR_SSDP_UDN: Final = "ssdp_udn" +ATTR_SSDP_USN: Final = "ssdp_usn" +ATTR_SSDP_EXT: Final = "ssdp_ext" +ATTR_SSDP_SERVER: Final = "ssdp_server" ATTR_SSDP_BOOTID = "BOOTID.UPNP.ORG" ATTR_SSDP_NEXTBOOTID = "NEXTBOOTID.UPNP.ORG" # Attributes for accessing info from retrieved UPnP device description +ATTR_UPNP: Final = "upnp" ATTR_UPNP_DEVICE_TYPE = "deviceType" ATTR_UPNP_FRIENDLY_NAME = "friendlyName" ATTR_UPNP_MANUFACTURER = "manufacturer" @@ -108,29 +109,10 @@ class SsdpDescription(_SsdpDescriptionBase, total=False): ssdp_server: str -class _UpnpDescriptionBase(TypedDict, total=True): +class UpnpDescription(TypedDict, total=True): """Compulsory keys for UPnP info.""" - deviceType: str - friendlyName: str - manufacturer: str - modelName: str - UDN: str - - -class UpnpDescription(_UpnpDescriptionBase, total=False): - """UPnP info with optional keys.""" - - manufacturerURL: str - modelDescription: str - modelNumber: str - modelURL: str - serialNumber: str - UPC: str - iconList: dict[str, list[dict[str, str]]] - serviceList: dict[str, list[dict[str, str]]] - deviceList: dict[str, Any] - presentationURL: str + upnp: dict[str, Any] class SsdpServiceInfo(SsdpDescription, UpnpDescription, _HaServiceInfoDescription): @@ -438,7 +420,6 @@ async def _ssdp_listener_callback( location = ssdp_device.location info_desc = await self._async_get_description_dict(location) or {} combined_headers = ssdp_device.combined_headers(dst) - info_with_desc = CaseInsensitiveDict(combined_headers, **info_desc) callbacks = self._async_get_matching_callbacks(combined_headers) matching_domains: set[str] = set() @@ -446,13 +427,15 @@ async def _ssdp_listener_callback( # If there are no changes from a search, do not trigger a config flow if source != SsdpSource.SEARCH_ALIVE: matching_domains = self.integration_matchers.async_matching_domains( - info_with_desc + CaseInsensitiveDict(combined_headers, **info_desc) ) if not callbacks and not matching_domains: return - discovery_info = discovery_info_from_headers_and_description(info_with_desc) + discovery_info = discovery_info_from_headers_and_description( + combined_headers, info_desc + ) discovery_info[ATTR_HA_MATCHING_DOMAINS] = matching_domains ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source] await _async_process_callbacks(callbacks, discovery_info, ssdp_change) @@ -490,7 +473,7 @@ async def _async_headers_to_discovery_info( await self._description_cache.async_get_description_dict(location) or {} ) return discovery_info_from_headers_and_description( - CaseInsensitiveDict(headers, **info_desc) + CaseInsensitiveDict(headers), info_desc ) async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name @@ -521,22 +504,22 @@ async def async_get_discovery_info_by_udn(self, udn: str) -> list[SsdpServiceInf def discovery_info_from_headers_and_description( - info_with_desc: CaseInsensitiveDict, + combined_headers: MutableMapping[str, Any], + info_desc: Mapping[str, str], ) -> SsdpServiceInfo: """Convert headers and description to discovery_info.""" - info = { - DISCOVERY_MAPPING.get(k.lower(), k): v - for k, v in info_with_desc.as_dict().items() - } - - if ATTR_UPNP_UDN not in info and ATTR_SSDP_USN in info: - if udn := _udn_from_usn(info[ATTR_SSDP_USN]): - info[ATTR_UPNP_UDN] = udn + info = {DISCOVERY_MAPPING.get(k.lower(), k): v for k, v in combined_headers.items()} # Increase compatibility. if ATTR_SSDP_ST not in info and ATTR_SSDP_NT in info: info[ATTR_SSDP_ST] = info[ATTR_SSDP_NT] + # Duplicate upnp data + info[ATTR_UPNP] = info_desc + if ATTR_UPNP_UDN not in info[ATTR_UPNP] and ATTR_SSDP_USN in info: + if udn := _udn_from_usn(info[ATTR_SSDP_USN]): + info[ATTR_UPNP][ATTR_UPNP_UDN] = udn + return cast(SsdpServiceInfo, info) diff --git a/tests/components/ssdp/test_init.py b/tests/components/ssdp/test_init.py index c9098726ab452..5e3368b8ff7d4 100644 --- a/tests/components/ssdp/test_init.py +++ b/tests/components/ssdp/test_init.py @@ -68,10 +68,12 @@ async def test_ssdp_flow_dispatched_on_st(mock_get_ssdp, hass, caplog, mock_flow ssdp.ATTR_SSDP_USN: "uuid:mock-udn::mock-st", ssdp.ATTR_SSDP_SERVER: "mock-server", ssdp.ATTR_SSDP_EXT: "", - ssdp.ATTR_UPNP_UDN: "uuid:mock-udn", ssdp.ATTR_SSDP_UDN: ANY, "_timestamp": ANY, ssdp.ATTR_HA_MATCHING_DOMAINS: {"mock-domain"}, + ssdp.ATTR_UPNP: { + ssdp.ATTR_UPNP_UDN: "uuid:mock-udn", + }, } assert "Failed to fetch ssdp data" not in caplog.text @@ -353,11 +355,13 @@ async def test_discovery_from_advertisement_sets_ssdp_st( ssdp.ATTR_SSDP_NT: "mock-st", ssdp.ATTR_SSDP_ST: "mock-st", # Set by ssdp component, not in original advertisement. ssdp.ATTR_SSDP_USN: "uuid:mock-udn::mock-st", - ssdp.ATTR_UPNP_UDN: "uuid:mock-udn", - ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_SSDP_UDN: ANY, "nts": "ssdp:alive", "_timestamp": ANY, + ssdp.ATTR_UPNP: { + ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", + ssdp.ATTR_UPNP_UDN: "uuid:mock-udn", + }, } ] @@ -454,17 +458,19 @@ async def test_scan_with_registered_callback( assert async_not_matching_integration_callback1.call_count == 0 assert async_integration_callback.call_args[0] == ( { - ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_SSDP_EXT: "", ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1", ssdp.ATTR_SSDP_SERVER: "mock-server", ssdp.ATTR_SSDP_ST: "mock-st", ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st", - ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", "x-rincon-bootseq": "55", ssdp.ATTR_SSDP_UDN: ANY, "_timestamp": ANY, ssdp.ATTR_HA_MATCHING_DOMAINS: set(), + ssdp.ATTR_UPNP: { + ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", + ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", + }, }, ssdp.SsdpChange.ALIVE, ) @@ -517,10 +523,12 @@ async def test_getting_existing_headers( ssdp.ATTR_SSDP_SERVER: "mock-server", ssdp.ATTR_SSDP_ST: "mock-st", ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3", - ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", - ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_SSDP_UDN: ANY, "_timestamp": ANY, + ssdp.ATTR_UPNP: { + ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", + ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", + }, } ] @@ -534,10 +542,12 @@ async def test_getting_existing_headers( ssdp.ATTR_SSDP_SERVER: "mock-server", ssdp.ATTR_SSDP_ST: "mock-st", ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3", - ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", - ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_SSDP_UDN: ANY, "_timestamp": ANY, + ssdp.ATTR_UPNP: { + ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", + ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", + }, } ] @@ -550,10 +560,12 @@ async def test_getting_existing_headers( ssdp.ATTR_SSDP_SERVER: "mock-server", ssdp.ATTR_SSDP_ST: "mock-st", ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3", - ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", - ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_SSDP_UDN: ANY, "_timestamp": ANY, + ssdp.ATTR_UPNP: { + ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", + ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", + }, } assert (