Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions homeassistant/components/dlna_dmr/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def async_step_user(self, user_input: FlowInput = None) -> FlowResult:
return await self.async_step_manual()

self._discoveries = {
discovery.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
discovery[ssdp.ATTR_UPNP].get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
or urlparse(discovery[ssdp.ATTR_SSDP_LOCATION]).hostname: discovery
for discovery in discoveries
}
Expand Down Expand Up @@ -217,7 +217,9 @@ async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult
# Abort if the device doesn't support all services required for a DmrDevice.
# Use the discovery_info instead of DmrDevice.is_profile_device to avoid
# contacting the device again.
discovery_service_list = discovery_info.get(ssdp.ATTR_UPNP_SERVICE_LIST)
discovery_service_list = discovery_info[ssdp.ATTR_UPNP].get(
ssdp.ATTR_UPNP_SERVICE_LIST
)
if not discovery_service_list:
return self.async_abort(reason="not_dmr")
discovery_service_ids = {
Expand Down Expand Up @@ -352,7 +354,7 @@ async def _async_set_info_from_discovery(
discovery_info.get(ssdp.ATTR_SSDP_NT) or discovery_info[ssdp.ATTR_SSDP_ST]
)
self._name = (
discovery_info.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
discovery_info[ssdp.ATTR_UPNP].get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
or urlparse(self._location).hostname
or DEFAULT_NAME
)
Expand Down Expand Up @@ -466,14 +468,15 @@ def _is_ignored_device(discovery_info: Mapping[str, Any]) -> bool:
return True

# Is the root device not a DMR?
if discovery_info.get(ssdp.ATTR_UPNP_DEVICE_TYPE) not in DmrDevice.DEVICE_TYPES:
upnp_info = discovery_info[ssdp.ATTR_UPNP]
if upnp_info.get(ssdp.ATTR_UPNP_DEVICE_TYPE) not in DmrDevice.DEVICE_TYPES:
return True

# Special cases for devices with other discovery methods (e.g. mDNS), or
# that advertise multiple unrelated (sent in separate discovery packets)
# UPnP devices.
manufacturer = discovery_info.get(ssdp.ATTR_UPNP_MANUFACTURER, "").lower()
model = discovery_info.get(ssdp.ATTR_UPNP_MODEL_NAME, "").lower()
manufacturer = upnp_info.get(ssdp.ATTR_UPNP_MANUFACTURER, "").lower()
model = upnp_info.get(ssdp.ATTR_UPNP_MODEL_NAME, "").lower()

if manufacturer.startswith("xbmc") or model == "kodi":
# kodi
Expand Down
48 changes: 19 additions & 29 deletions homeassistant/components/ssdp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@

# Attributes for accessing info from SSDP response
ATTR_SSDP_LOCATION: Final = "ssdp_location"
ATTR_SSDP_ST = "ssdp_st"
ATTR_SSDP_NT = "ssdp_nt"
ATTR_SSDP_UDN = "ssdp_udn"
ATTR_SSDP_USN = "ssdp_usn"
ATTR_SSDP_EXT = "ssdp_ext"
ATTR_SSDP_SERVER = "ssdp_server"
ATTR_SSDP_ST: Final = "ssdp_st"
ATTR_SSDP_NT: Final = "ssdp_nt"
ATTR_SSDP_UDN: Final = "ssdp_udn"
ATTR_SSDP_USN: Final = "ssdp_usn"
ATTR_SSDP_EXT: Final = "ssdp_ext"
ATTR_SSDP_SERVER: Final = "ssdp_server"
ATTR_SSDP_BOOTID = "BOOTID.UPNP.ORG"
ATTR_SSDP_NEXTBOOTID = "NEXTBOOTID.UPNP.ORG"
# Attributes for accessing info from retrieved UPnP device description
ATTR_UPNP: Final = "upnp"
ATTR_UPNP_DEVICE_TYPE = "deviceType"
ATTR_UPNP_FRIENDLY_NAME = "friendlyName"
ATTR_UPNP_MANUFACTURER = "manufacturer"
Expand Down Expand Up @@ -108,29 +109,10 @@ class SsdpDescription(_SsdpDescriptionBase, total=False):
ssdp_server: str


class _UpnpDescriptionBase(TypedDict, total=True):
class UpnpDescription(TypedDict, total=True):
"""Compulsory keys for UPnP info."""

deviceType: str
friendlyName: str
manufacturer: str
modelName: str
UDN: str


class UpnpDescription(_UpnpDescriptionBase, total=False):
"""UPnP info with optional keys."""

manufacturerURL: str
modelDescription: str
modelNumber: str
modelURL: str
serialNumber: str
UPC: str
iconList: dict[str, list[dict[str, str]]]
serviceList: dict[str, list[dict[str, str]]]
deviceList: dict[str, Any]
presentationURL: str
upnp: dict[str, Any]


class SsdpServiceInfo(SsdpDescription, UpnpDescription, _HaServiceInfoDescription):
Expand Down Expand Up @@ -452,7 +434,9 @@ async def _ssdp_listener_callback(
if not callbacks and not matching_domains:
return

discovery_info = discovery_info_from_headers_and_description(info_with_desc)
discovery_info = discovery_info_from_headers_and_description(
info_with_desc, info_desc
)
discovery_info[ATTR_HA_MATCHING_DOMAINS] = matching_domains
ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
await _async_process_callbacks(callbacks, discovery_info, ssdp_change)
Expand Down Expand Up @@ -490,7 +474,7 @@ async def _async_headers_to_discovery_info(
await self._description_cache.async_get_description_dict(location) or {}
)
return discovery_info_from_headers_and_description(
CaseInsensitiveDict(headers, **info_desc)
CaseInsensitiveDict(headers, **info_desc), info_desc
)

async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
Expand Down Expand Up @@ -522,6 +506,7 @@ async def async_get_discovery_info_by_udn(self, udn: str) -> list[SsdpServiceInf

def discovery_info_from_headers_and_description(
info_with_desc: CaseInsensitiveDict,
info_desc: Mapping[str, str],
) -> SsdpServiceInfo:
"""Convert headers and description to discovery_info."""
info = {
Expand All @@ -537,6 +522,11 @@ def discovery_info_from_headers_and_description(
if ATTR_SSDP_ST not in info and ATTR_SSDP_NT in info:
info[ATTR_SSDP_ST] = info[ATTR_SSDP_NT]

# Duplicate upnp data
info[ATTR_UPNP] = info_desc
if ATTR_UPNP_UDN not in info[ATTR_UPNP]:
info[ATTR_UPNP][ATTR_UPNP_UDN] = udn

return cast(SsdpServiceInfo, info)


Expand Down
93 changes: 52 additions & 41 deletions tests/components/dlna_dmr/test_config_flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test the DLNA config flow."""
from __future__ import annotations

from copy import deepcopy
from unittest.mock import Mock

from async_upnp_client import UpnpDevice, UpnpError
Expand Down Expand Up @@ -56,33 +57,35 @@
ssdp.ATTR_SSDP_LOCATION: MOCK_DEVICE_LOCATION,
ssdp.ATTR_SSDP_UDN: MOCK_DEVICE_UDN,
ssdp.ATTR_SSDP_ST: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
ssdp.ATTR_UPNP_SERVICE_LIST: {
"service": [
{
"SCPDURL": "/AVTransport/scpd.xml",
"controlURL": "/AVTransport/control.xml",
"eventSubURL": "/AVTransport/event.xml",
"serviceId": "urn:upnp-org:serviceId:AVTransport",
"serviceType": "urn:schemas-upnp-org:service:AVTransport:1",
},
{
"SCPDURL": "/ConnectionManager/scpd.xml",
"controlURL": "/ConnectionManager/control.xml",
"eventSubURL": "/ConnectionManager/event.xml",
"serviceId": "urn:upnp-org:serviceId:ConnectionManager",
"serviceType": "urn:schemas-upnp-org:service:ConnectionManager:1",
},
{
"SCPDURL": "/RenderingControl/scpd.xml",
"controlURL": "/RenderingControl/control.xml",
"eventSubURL": "/RenderingControl/event.xml",
"serviceId": "urn:upnp-org:serviceId:RenderingControl",
"serviceType": "urn:schemas-upnp-org:service:RenderingControl:1",
},
]
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
ssdp.ATTR_UPNP_SERVICE_LIST: {
"service": [
{
"SCPDURL": "/AVTransport/scpd.xml",
"controlURL": "/AVTransport/control.xml",
"eventSubURL": "/AVTransport/event.xml",
"serviceId": "urn:upnp-org:serviceId:AVTransport",
"serviceType": "urn:schemas-upnp-org:service:AVTransport:1",
},
{
"SCPDURL": "/ConnectionManager/scpd.xml",
"controlURL": "/ConnectionManager/control.xml",
"eventSubURL": "/ConnectionManager/event.xml",
"serviceId": "urn:upnp-org:serviceId:ConnectionManager",
"serviceType": "urn:schemas-upnp-org:service:ConnectionManager:1",
},
{
"SCPDURL": "/RenderingControl/scpd.xml",
"controlURL": "/RenderingControl/control.xml",
"eventSubURL": "/RenderingControl/event.xml",
"serviceId": "urn:upnp-org:serviceId:RenderingControl",
"serviceType": "urn:schemas-upnp-org:service:RenderingControl:1",
},
]
},
},
ssdp.ATTR_HA_MATCHING_DOMAINS: {DLNA_DOMAIN},
}
Expand Down Expand Up @@ -548,9 +551,11 @@ async def test_ssdp_flow_existing(
data={
ssdp.ATTR_SSDP_LOCATION: NEW_DEVICE_LOCATION,
ssdp.ATTR_SSDP_UDN: MOCK_DEVICE_UDN,
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
},
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
Expand All @@ -570,9 +575,11 @@ async def test_ssdp_flow_upnp_udn(
ssdp.ATTR_SSDP_LOCATION: NEW_DEVICE_LOCATION,
ssdp.ATTR_SSDP_UDN: MOCK_DEVICE_UDN,
ssdp.ATTR_SSDP_ST: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_UDN: "DIFFERENT_ROOT_DEVICE",
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_UDN: "DIFFERENT_ROOT_DEVICE",
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
},
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
Expand All @@ -583,8 +590,8 @@ async def test_ssdp_flow_upnp_udn(
async def test_ssdp_missing_services(hass: HomeAssistant) -> None:
"""Test SSDP ignores devices that are missing required services."""
# No services defined at all
discovery = dict(MOCK_DISCOVERY)
del discovery[ssdp.ATTR_UPNP_SERVICE_LIST]
discovery = deepcopy(MOCK_DISCOVERY)
del discovery[ssdp.ATTR_UPNP][ssdp.ATTR_UPNP_SERVICE_LIST]
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
Expand All @@ -594,11 +601,13 @@ async def test_ssdp_missing_services(hass: HomeAssistant) -> None:
assert result["reason"] == "not_dmr"

# AVTransport service is missing
discovery = dict(MOCK_DISCOVERY)
discovery[ssdp.ATTR_UPNP_SERVICE_LIST] = {
discovery = deepcopy(MOCK_DISCOVERY)
discovery[ssdp.ATTR_UPNP][ssdp.ATTR_UPNP_SERVICE_LIST] = {
"service": [
service
for service in discovery[ssdp.ATTR_UPNP_SERVICE_LIST]["service"]
for service in discovery[ssdp.ATTR_UPNP][ssdp.ATTR_UPNP_SERVICE_LIST][
"service"
]
if service.get("serviceId") != "urn:upnp-org:serviceId:AVTransport"
]
}
Expand All @@ -622,7 +631,9 @@ async def test_ssdp_ignore_device(hass: HomeAssistant) -> None:
assert result["reason"] == "alternative_integration"

discovery = dict(MOCK_DISCOVERY)
discovery[ssdp.ATTR_UPNP_DEVICE_TYPE] = "urn:schemas-upnp-org:device:ZonePlayer:1"
discovery[ssdp.ATTR_UPNP][
ssdp.ATTR_UPNP_DEVICE_TYPE
] = "urn:schemas-upnp-org:device:ZonePlayer:1"
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
Expand All @@ -637,8 +648,8 @@ async def test_ssdp_ignore_device(hass: HomeAssistant) -> None:
("LG Electronics.", "LG TV"),
]:
discovery = dict(MOCK_DISCOVERY)
discovery[ssdp.ATTR_UPNP_MANUFACTURER] = manufacturer
discovery[ssdp.ATTR_UPNP_MODEL_NAME] = model
discovery[ssdp.ATTR_UPNP][ssdp.ATTR_UPNP_MANUFACTURER] = manufacturer
discovery[ssdp.ATTR_UPNP][ssdp.ATTR_UPNP_MODEL_NAME] = model
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
Expand Down
23 changes: 23 additions & 0 deletions tests/components/ssdp/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ async def test_ssdp_flow_dispatched_on_st(mock_get_ssdp, hass, caplog, mock_flow
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_HA_MATCHING_DOMAINS: {"mock-domain"},
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
},
}
assert "Failed to fetch ssdp data" not in caplog.text

Expand Down Expand Up @@ -358,6 +361,10 @@ async def test_discovery_from_advertisement_sets_ssdp_st(
ssdp.ATTR_SSDP_UDN: ANY,
"nts": "ssdp:alive",
"_timestamp": ANY,
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
},
}
]

Expand Down Expand Up @@ -465,6 +472,10 @@ async def test_scan_with_registered_callback(
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_HA_MATCHING_DOMAINS: set(),
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
},
},
ssdp.SsdpChange.ALIVE,
)
Expand Down Expand Up @@ -521,6 +532,10 @@ async def test_getting_existing_headers(
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
},
}
]

Expand All @@ -538,6 +553,10 @@ async def test_getting_existing_headers(
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
},
}
]

Expand All @@ -554,6 +573,10 @@ async def test_getting_existing_headers(
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
ssdp.ATTR_UPNP: {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
},
}

assert (
Expand Down