diff --git a/homeassistant/components/shelly/__init__.py b/homeassistant/components/shelly/__init__.py index 32e733265a81b2..b47aa78cb08359 100644 --- a/homeassistant/components/shelly/__init__.py +++ b/homeassistant/components/shelly/__init__.py @@ -84,6 +84,7 @@ Platform.COVER, Platform.EVENT, Platform.LIGHT, + Platform.MEDIA_PLAYER, Platform.NUMBER, Platform.SELECT, Platform.SENSOR, diff --git a/homeassistant/components/shelly/media_player.py b/homeassistant/components/shelly/media_player.py new file mode 100644 index 00000000000000..8479f2e72e779c --- /dev/null +++ b/homeassistant/components/shelly/media_player.py @@ -0,0 +1,430 @@ +"""Media player for Shelly.""" + +from __future__ import annotations + +import base64 +import binascii +from dataclasses import dataclass +import datetime +import hashlib +from typing import Any, Final, cast + +from aioshelly.const import RPC_GENERATIONS +from aioshelly.exceptions import DeviceConnectionError, InvalidAuthError, RpcCallError + +from homeassistant.components.media_player import ( + BrowseMedia, + MediaClass, + MediaPlayerDeviceClass, + MediaPlayerEntity, + MediaPlayerEntityDescription, + MediaPlayerEntityFeature, + MediaPlayerState, + MediaType, +) +from homeassistant.core import HomeAssistant, callback +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback +import homeassistant.util.dt as dt_util + +from .const import DOMAIN +from .coordinator import ShellyConfigEntry, ShellyRpcCoordinator +from .entity import ( + RpcEntityDescription, + ShellyRpcAttributeEntity, + async_setup_entry_rpc, + rpc_call, +) +from .utils import get_device_entry_gen + +CONTENT_TYPE_AUDIO = "audio" +CONTENT_TYPE_RADIO = "radio" + +PARALLEL_UPDATES = 0 + + +@dataclass(frozen=True, kw_only=True) +class RpcMediaPlayerDescription(RpcEntityDescription, MediaPlayerEntityDescription): + """Class to describe a Shelly RPC media player entity.""" + + +RPC_MEDIA_PLAYER_ENTITIES: Final = { + "media": RpcMediaPlayerDescription( + key="media", + device_class=MediaPlayerDeviceClass.SPEAKER, + ), +} + + +async def async_setup_entry( + hass: HomeAssistant, + config_entry: ShellyConfigEntry, + async_add_entities: AddConfigEntryEntitiesCallback, +) -> None: + """Set up media player for Shelly devices.""" + if get_device_entry_gen(config_entry) in RPC_GENERATIONS: + return _async_setup_rpc_entry(hass, config_entry, async_add_entities) + + return None + + +@callback +def _async_setup_rpc_entry( + hass: HomeAssistant, + config_entry: ShellyConfigEntry, + async_add_entities: AddConfigEntryEntitiesCallback, +) -> None: + """Set up entities for RPC device.""" + async_setup_entry_rpc( + hass, + config_entry, + async_add_entities, + RPC_MEDIA_PLAYER_ENTITIES, + ShellyRpcMediaPlayer, + ) + + +class ShellyRpcMediaPlayer(ShellyRpcAttributeEntity, MediaPlayerEntity): + """Representation of a Shelly RPC media player entity.""" + + _attr_name = None + _attr_supported_features = ( + MediaPlayerEntityFeature.PLAY + | MediaPlayerEntityFeature.PAUSE + | MediaPlayerEntityFeature.STOP + | MediaPlayerEntityFeature.NEXT_TRACK + | MediaPlayerEntityFeature.PREVIOUS_TRACK + | MediaPlayerEntityFeature.VOLUME_SET + | MediaPlayerEntityFeature.BROWSE_MEDIA + | MediaPlayerEntityFeature.PLAY_MEDIA + ) + _attr_media_content_type = MediaType.MUSIC + entity_description: RpcMediaPlayerDescription + + _last_media_position: int | None = None + _last_media_position_updated_at: datetime.datetime | None = None + + def __init__( + self, + coordinator: ShellyRpcCoordinator, + key: str, + attribute: str, + description: RpcMediaPlayerDescription, + ) -> None: + """Initialize Shelly RPC media player.""" + super().__init__(coordinator, key, attribute, description) + + @property + def _media_meta(self) -> dict[str, Any]: + """Return the media metadata.""" + return cast(dict[str, Any], self.status["playback"].get("media_meta", {})) + + @property + def state(self) -> MediaPlayerState: + """Return the state of the media player.""" + if self.status["playback"]["buffering"]: + return MediaPlayerState.BUFFERING + + if self.status["playback"]["enable"]: + return MediaPlayerState.PLAYING + + return MediaPlayerState.IDLE + + @property + def volume_level(self) -> float | None: + """Return the volume level of the media player (0..1).""" + volume = self.status["playback"]["volume"] + + return cast(float, volume) / 10 + + @property + def media_title(self) -> str | None: + """Return the title of current playing media.""" + if title := self._media_meta.get("title"): + return cast(str, title) + + return None + + @property + def media_artist(self) -> str | None: + """Return the artist of current playing media.""" + if self.status["playback"].get("media_type") == "RADIO": + return None + + if artist := self._media_meta.get("artist"): + return cast(str, artist) + + return None + + @property + def media_album_name(self) -> str | None: + """Return the album name of current playing media.""" + if self.status["playback"].get("media_type") == "RADIO": + return None + + if album := self._media_meta.get("album"): + return cast(str, album) + + return None + + @property + def media_duration(self) -> int | None: + """Return the duration of current playing media in seconds.""" + if self.status["playback"].get("media_type") == "RADIO": + return None + + if (duration := self._media_meta.get("duration")) is not None: + return cast(int, duration) // 1000 + + return None + + @property + def media_position(self) -> int | None: + """Return the current playback position in seconds.""" + if (position := self._get_updated_media_position()) is not None: + return position // 1000 + + return None + + @property + def media_position_updated_at(self) -> datetime.datetime | None: + """Return when the position was last updated.""" + self._get_updated_media_position() + + return self._last_media_position_updated_at + + @property + def media_image_url(self) -> str | None: + """Return the image URL of current playing media.""" + if (thumb := self._media_meta.get("thumb")) and thumb.startswith("http"): + return cast(str, thumb) + + return None + + @property + def media_image_remotely_accessible(self) -> bool: + """Return True if the image URL is remotely accessible.""" + return self.media_image_url is not None + + @property + def media_image_hash(self) -> str | None: + """Hash value for media image.""" + if (thumb := self._media_meta.get("thumb")) and thumb.startswith("data"): + return hashlib.sha256(thumb.encode("utf-8")).hexdigest()[:16] + return super().media_image_hash + + def _get_updated_media_position(self) -> int | None: + """Return the current playback position and update its timestamp.""" + if (position := self._media_meta.get("position")) is None: + self._last_media_position = None + self._last_media_position_updated_at = None + return None + + current_position = cast(int, position) + if current_position != self._last_media_position: + self._last_media_position = current_position + self._last_media_position_updated_at = dt_util.utcnow() + + return current_position + + async def async_get_media_image(self) -> tuple[bytes | None, str | None]: + """Fetch media image of current playing track.""" + thumb = self._media_meta["thumb"] + try: + prefix, image_data = thumb.split(",", 1) + image = base64.b64decode(image_data, validate=True) + mime = prefix.split(";", 1)[0].rsplit(":", 1)[-1] + except binascii.Error, ValueError: + return await super().async_get_media_image() + + return image, mime + + @rpc_call + async def async_media_play(self) -> None: + """Send play command.""" + if self.state != MediaPlayerState.PLAYING: + await self.coordinator.device.media_play_or_pause() + + @rpc_call + async def async_media_pause(self) -> None: + """Send pause command.""" + if self.state == MediaPlayerState.PLAYING: + await self.coordinator.device.media_play_or_pause() + + @rpc_call + async def async_media_stop(self) -> None: + """Send stop command.""" + await self.coordinator.device.media_stop() + + @rpc_call + async def async_media_next_track(self) -> None: + """Send next track command.""" + await self.coordinator.device.media_next() + + @rpc_call + async def async_media_previous_track(self) -> None: + """Send previous track command.""" + await self.coordinator.device.media_previous() + + @rpc_call + async def async_set_volume_level(self, volume: float) -> None: + """Set volume level, range 0..1.""" + await self.coordinator.device.media_set_volume(round(volume * 10)) + + async def async_browse_media( + self, + media_content_type: MediaType | str | None = None, + media_content_id: str | None = None, + ) -> BrowseMedia: + """Browse radio stations and audio files.""" + if not media_content_type: + return await self._async_browse_media_root() + + try: + if media_content_type == CONTENT_TYPE_RADIO: + return await self._async_browse_radio_stations(expanded=True) + if media_content_type == CONTENT_TYPE_AUDIO: + return await self._async_browse_audio_files(expanded=True) + except DeviceConnectionError as err: + self.coordinator.last_update_success = False + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="device_communication_action_error", + translation_placeholders={ + "entity": self.entity_id, + "device": self.coordinator.name, + }, + ) from err + except RpcCallError as err: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="rpc_call_action_error", + translation_placeholders={ + "entity": self.entity_id, + "device": self.coordinator.name, + }, + ) from err + except InvalidAuthError as err: + await self.coordinator.async_shutdown_device_and_start_reauth() + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="auth_error", + translation_placeholders={ + "device": self.coordinator.name, + }, + ) from err + + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="unsupported_media_content_type", + translation_placeholders={"media_content_type": str(media_content_type)}, + ) + + async def _async_browse_media_root(self) -> BrowseMedia: + """Return root BrowseMedia tree.""" + return BrowseMedia( + title="Shelly", + media_class=MediaClass.DIRECTORY, + media_content_type="", + media_content_id="", + children=[ + await self._async_browse_radio_stations(), + await self._async_browse_audio_files(), + ], + can_play=False, + can_expand=True, + ) + + async def _async_browse_audio_files(self, expanded: bool = False) -> BrowseMedia: + """Return BrowseMedia tree for audio files.""" + if expanded: + result: list[ + dict[str, Any] + ] = await self.coordinator.device.media_list_media() + children: list[BrowseMedia] | None = [ + BrowseMedia( + title=item["title"], + media_class=MediaClass.MUSIC, + media_content_type=CONTENT_TYPE_AUDIO, + media_content_id=str(item["id"]), + thumbnail=item["preview"], + can_play=True, + can_expand=False, + ) + for item in result + if item["type"] == "AUDIO" + ] + else: + children = None + + return BrowseMedia( + title="Audio files", + media_class=MediaClass.DIRECTORY, + media_content_type=CONTENT_TYPE_AUDIO, + media_content_id=CONTENT_TYPE_AUDIO, + children_media_class=MediaClass.MUSIC, + children=children, + can_play=False, + can_expand=True, + ) + + async def _async_browse_radio_stations(self, expanded: bool = False) -> BrowseMedia: + """Return BrowseMedia tree for radio stations.""" + if expanded: + result: list[ + dict[str, Any] + ] = await self.coordinator.device.media_list_radio_stations() + children: list[BrowseMedia] | None = [ + BrowseMedia( + title=station["name"], + media_class=MediaClass.MUSIC, + media_content_type=CONTENT_TYPE_RADIO, + media_content_id=str(station["id"]), + thumbnail=station["icon"], + can_play=True, + can_expand=False, + ) + for station in result + ] + else: + children = None + + return BrowseMedia( + title="Radio stations", + media_class=MediaClass.DIRECTORY, + media_content_type=CONTENT_TYPE_RADIO, + media_content_id=CONTENT_TYPE_RADIO, + children_media_class=MediaClass.MUSIC, + children=children, + can_play=False, + can_expand=True, + ) + + @rpc_call + async def async_play_media( + self, + media_type: MediaType | str, + media_id: str, + **kwargs: Any, + ) -> None: + """Play media by type and id.""" + if media_id.isdecimal() is False: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="unsupported_media_id", + translation_placeholders={"media_id": media_id}, + ) + + if media_type == CONTENT_TYPE_RADIO: + await self.coordinator.device.media_play_radio_station(int(media_id)) + return + + if media_type == CONTENT_TYPE_AUDIO: + await self.coordinator.device.media_play_media(int(media_id)) + return + + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="unsupported_media_type", + translation_placeholders={"media_type": str(media_type)}, + ) diff --git a/homeassistant/components/shelly/strings.json b/homeassistant/components/shelly/strings.json index 1a2fc3513f6f52..9d370f2e00a515 100644 --- a/homeassistant/components/shelly/strings.json +++ b/homeassistant/components/shelly/strings.json @@ -651,6 +651,15 @@ "rpc_call_error": { "message": "RPC call error occurred for {device}" }, + "unsupported_media_content_type": { + "message": "Unsupported media content type for Shelly device: {media_content_type}" + }, + "unsupported_media_id": { + "message": "Unsupported media ID for Shelly device: {media_id}" + }, + "unsupported_media_type": { + "message": "Unsupported media type for Shelly device: {media_type}" + }, "update_error": { "message": "An error occurred while retrieving data from {device}" }, diff --git a/tests/components/shelly/snapshots/test_devices.ambr b/tests/components/shelly/snapshots/test_devices.ambr index 83ea4a988bfe3e..2db79151229f1b 100644 --- a/tests/components/shelly/snapshots/test_devices.ambr +++ b/tests/components/shelly/snapshots/test_devices.ambr @@ -5973,6 +5973,61 @@ 'state': 'unknown', }) # --- +# name: test_device[wall_display_xl][media_player.test_name-entry] + EntityRegistryEntrySnapshot({ + 'aliases': list([ + None, + ]), + 'area_id': None, + 'capabilities': dict({ + }), + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'media_player', + 'entity_category': None, + 'entity_id': 'media_player.test_name', + 'has_entity_name': True, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'object_id_base': None, + 'options': dict({ + }), + 'original_device_class': , + 'original_icon': None, + 'original_name': None, + 'platform': 'shelly', + 'previous_unique_id': None, + 'suggested_object_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': '123456789ABC-media-media', + 'unit_of_measurement': None, + }) +# --- +# name: test_device[wall_display_xl][media_player.test_name-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'device_class': 'speaker', + 'friendly_name': 'Test name', + 'media_content_type': , + 'supported_features': , + 'volume_level': 0.7, + }), + 'context': , + 'entity_id': 'media_player.test_name', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'idle', + }) +# --- # name: test_device[wall_display_xl][sensor.test_name_humidity-entry] EntityRegistryEntrySnapshot({ 'aliases': list([ diff --git a/tests/components/shelly/snapshots/test_media_player.ambr b/tests/components/shelly/snapshots/test_media_player.ambr new file mode 100644 index 00000000000000..d4b527d52867c1 --- /dev/null +++ b/tests/components/shelly/snapshots/test_media_player.ambr @@ -0,0 +1,58 @@ +# serializer version: 1 +# name: test_rpc_media_player[media_player.test_name-entry] + EntityRegistryEntrySnapshot({ + 'aliases': list([ + None, + ]), + 'area_id': None, + 'capabilities': dict({ + }), + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'media_player', + 'entity_category': None, + 'entity_id': 'media_player.test_name', + 'has_entity_name': True, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'object_id_base': None, + 'options': dict({ + }), + 'original_device_class': , + 'original_icon': None, + 'original_name': None, + 'platform': 'shelly', + 'previous_unique_id': None, + 'suggested_object_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': '123456789ABC-media-media', + 'unit_of_measurement': None, + }) +# --- +# name: test_rpc_media_player[media_player.test_name-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'device_class': 'speaker', + 'entity_picture': 'https://www.radio_station.pl/icon.png', + 'friendly_name': 'Test name', + 'media_content_type': , + 'media_title': 'Radio Station', + 'supported_features': , + 'volume_level': 0.5, + }), + 'context': , + 'entity_id': 'media_player.test_name', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'playing', + }) +# --- diff --git a/tests/components/shelly/test_media_player.py b/tests/components/shelly/test_media_player.py new file mode 100644 index 00000000000000..d1f1b6ce74e35c --- /dev/null +++ b/tests/components/shelly/test_media_player.py @@ -0,0 +1,633 @@ +"""Tests for Shelly media player platform.""" + +from copy import deepcopy +from unittest.mock import Mock + +from aioshelly.const import MODEL_WALL_DISPLAY +from aioshelly.exceptions import DeviceConnectionError, InvalidAuthError, RpcCallError +import pytest +from syrupy.assertion import SnapshotAssertion +from syrupy.filters import props + +from homeassistant.components.media_player import ( + ATTR_MEDIA_ALBUM_NAME, + ATTR_MEDIA_ARTIST, + ATTR_MEDIA_CONTENT_ID, + ATTR_MEDIA_CONTENT_TYPE, + ATTR_MEDIA_DURATION, + ATTR_MEDIA_POSITION, + ATTR_MEDIA_TITLE, + ATTR_MEDIA_VOLUME_LEVEL, + DOMAIN as MEDIA_PLAYER_DOMAIN, + SERVICE_MEDIA_NEXT_TRACK, + SERVICE_MEDIA_PAUSE, + SERVICE_MEDIA_PLAY, + SERVICE_MEDIA_PREVIOUS_TRACK, + SERVICE_MEDIA_STOP, + SERVICE_PLAY_MEDIA, + SERVICE_VOLUME_SET, +) +from homeassistant.components.shelly.media_player import ( + CONTENT_TYPE_AUDIO, + CONTENT_TYPE_RADIO, +) +from homeassistant.const import ( + ATTR_ENTITY_ID, + STATE_BUFFERING, + STATE_IDLE, + STATE_PLAYING, + Platform, +) +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.entity_registry import EntityRegistry + +from . import init_integration, patch_platforms + +from tests.typing import ClientSessionGenerator, WebSocketGenerator + +ENTITY_ID = f"{MEDIA_PLAYER_DOMAIN}.test_name" + +AUDIO_FILES = [ + { + "album": "Album Placeholder", + "artist": "Artist Placeholder", + "duration": 106000, + "filename": "track_alpha.mp3", + "id": 16, + "index": 0, + "preview": "https://example.com/media/thumb?id=16&_t=track_alpha.mp3", + "size": 3390000, + "title": "Track Alpha", + "track": 0, + "type": "AUDIO", + "valid": True, + "year": 0, + }, + { + "album": "Album Placeholder", + "artist": "Artist Placeholder", + "duration": 138000, + "filename": "track_beta.mp3", + "id": 15, + "index": 0, + "preview": "https://example.com/media/thumb?id=15&_t=track_beta.mp3", + "size": 4425000, + "title": "Track Beta", + "track": 0, + "type": "AUDIO", + "valid": True, + "year": 0, + }, + { + "filename": "ringtone_gamma.mp3", + "id": 17, + "index": 0, + "preview": "https://example.com/media/thumb?id=17&_t=ringtone_gamma.mp3", + "size": 552000, + "title": "Ringtone Gamma", + "type": "RINGTONE", + "valid": True, + }, +] + +RADIO_STATIONS = [ + { + "id": 0, + "name": "Station Alpha", + "country_code": "XX", + "icon": "https://example.com/icons/alpha.png", + }, + { + "id": 1, + "name": "Station Beta", + "country_code": "XX", + "icon": "https://example.com/icons/beta.png", + }, + { + "id": 2, + "name": "Station Gamma", + "country_code": "XX", + "icon": "https://example.com/icons/gamma.png", + }, + { + "id": 3, + "name": "Station Delta", + "country_code": "XX", + "icon": "https://example.com/icons/delta.png", + }, +] +STATUS_RADIO_STATION = { + "playback": { + "enable": True, + "buffering": False, + "volume": 5, + "media_meta": { + "thumb": "https://www.radio_station.pl/icon.png", + "title": "Radio Station", + }, + "media_type": "RADIO", + }, +} +STATUS_AUDIO_FILE = { + "playback": { + "buffering": False, + "enable": True, + "volume": 2, + "media_meta": { + "album": "Album Name", + "artist": "Artist", + "duration": 132415, + "position": 64644, + "thumb": "data:image/webp;base64,UklGRkAAAABXRUJQVlA4WAoAAAAQAAAAAAAAAAAAQUxQSAIAAAAAAFZQOCAYAAAAMAEAnQEqAQABAAFAJiWkAANwAP79NmgA", + "title": "Title", + }, + "media_type": "AUDIO", + } +} + + +@pytest.fixture(autouse=True) +def fixture_platforms(): + """Limit platforms under test.""" + with patch_platforms([Platform.MEDIA_PLAYER]): + yield + + +async def test_rpc_media_player( + hass: HomeAssistant, + entity_registry: EntityRegistry, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + snapshot: SnapshotAssertion, +) -> None: + """Test a Shelly RPC media player.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_RADIO_STATION + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + assert (state := hass.states.get(ENTITY_ID)) + assert state == snapshot( + name=f"{ENTITY_ID}-state", exclude=props("entity_picture_local") + ) + + assert (entry := entity_registry.async_get(ENTITY_ID)) + assert entry == snapshot(name=f"{ENTITY_ID}-entry") + + monkeypatch.setitem(mock_rpc_device.status["media"]["playback"], "enable", False) + monkeypatch.setitem(mock_rpc_device.status["media"]["playback"], "buffering", True) + mock_rpc_device.mock_update() + + assert (state := hass.states.get(ENTITY_ID)) + assert state.state == STATE_BUFFERING + + +async def test_rpc_media_player_audio_file( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test a Shelly RPC media player.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + assert (state := hass.states.get(ENTITY_ID)) + assert state.state == STATE_PLAYING + assert state.attributes[ATTR_MEDIA_TITLE] == "Title" + assert state.attributes[ATTR_MEDIA_ARTIST] == "Artist" + assert state.attributes[ATTR_MEDIA_ALBUM_NAME] == "Album Name" + assert state.attributes[ATTR_MEDIA_DURATION] == 132 + assert state.attributes[ATTR_MEDIA_POSITION] == 64 + assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.2 + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_MEDIA_PAUSE, + {ATTR_ENTITY_ID: ENTITY_ID}, + blocking=True, + ) + monkeypatch.setitem(mock_rpc_device.status["media"]["playback"], "enable", False) + mock_rpc_device.mock_update() + + mock_rpc_device.media_play_or_pause.assert_called_once() + assert (state := hass.states.get(ENTITY_ID)) + assert state.state == STATE_IDLE + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_MEDIA_PLAY, + {ATTR_ENTITY_ID: ENTITY_ID}, + blocking=True, + ) + monkeypatch.setitem(mock_rpc_device.status["media"]["playback"], "enable", True) + mock_rpc_device.mock_update() + + assert len(mock_rpc_device.media_play_or_pause.mock_calls) == 2 + assert (state := hass.states.get(ENTITY_ID)) + assert state.state == STATE_PLAYING + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_MEDIA_STOP, + {ATTR_ENTITY_ID: ENTITY_ID}, + blocking=True, + ) + monkeypatch.setitem(mock_rpc_device.status["media"]["playback"], "enable", False) + mock_rpc_device.mock_update() + + mock_rpc_device.media_stop.assert_called_once() + assert (state := hass.states.get(ENTITY_ID)) + assert state.state == STATE_IDLE + + +async def test_rpc_media_player_actions( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test a Shelly RPC media player.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_MEDIA_NEXT_TRACK, + {ATTR_ENTITY_ID: ENTITY_ID}, + blocking=True, + ) + + mock_rpc_device.media_next.assert_called_once() + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_MEDIA_PREVIOUS_TRACK, + {ATTR_ENTITY_ID: ENTITY_ID}, + blocking=True, + ) + mock_rpc_device.mock_update() + + mock_rpc_device.media_previous.assert_called_once() + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_VOLUME_SET, + {ATTR_ENTITY_ID: ENTITY_ID, ATTR_MEDIA_VOLUME_LEVEL: 0.5}, + blocking=True, + ) + + mock_rpc_device.media_set_volume.assert_called_once_with(5) + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID, + ATTR_MEDIA_CONTENT_TYPE: CONTENT_TYPE_AUDIO, + ATTR_MEDIA_CONTENT_ID: "12", + }, + blocking=True, + ) + + mock_rpc_device.media_play_media.assert_called_once_with(12) + + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID, + ATTR_MEDIA_CONTENT_TYPE: CONTENT_TYPE_RADIO, + ATTR_MEDIA_CONTENT_ID: "2", + }, + blocking=True, + ) + + mock_rpc_device.media_play_radio_station.assert_called_once_with(2) + + +async def test_rpc_media_player_play_media_errors( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test a Shelly RPC errors in play media method.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + with pytest.raises( + HomeAssistantError, match="Unsupported media ID for Shelly device: invalid" + ): + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID, + ATTR_MEDIA_CONTENT_TYPE: CONTENT_TYPE_RADIO, + ATTR_MEDIA_CONTENT_ID: "invalid", + }, + blocking=True, + ) + + with pytest.raises( + HomeAssistantError, match="Unsupported media type for Shelly device: invalid" + ): + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID, + ATTR_MEDIA_CONTENT_TYPE: "invalid", + ATTR_MEDIA_CONTENT_ID: "1", + }, + blocking=True, + ) + + +async def test_get_image_http( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_client_no_auth: ClientSessionGenerator, +) -> None: + """Test get image via http command.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + state = hass.states.get(ENTITY_ID) + assert "entity_picture_local" not in state.attributes + + client = await hass_client_no_auth() + + resp = await client.get(state.attributes["entity_picture"]) + content = await resp.read() + + assert isinstance(content, bytes) + + +async def test_get_image_http_base64_decode_error( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_client_no_auth: ClientSessionGenerator, +) -> None: + """Test get image via http command base64 decode error.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + status["media"]["playback"]["media_meta"]["thumb"] = "data:image/webp;base64,0" + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + state = hass.states.get(ENTITY_ID) + assert "entity_picture_local" not in state.attributes + + client = await hass_client_no_auth() + + resp = await client.get(state.attributes["entity_picture"]) + content = await resp.read() + + assert isinstance(content, bytes) + + +async def test_rpc_media_player_browse_media_root( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test Shelly media player browse media root.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + websocket_client = await hass_ws_client(hass) + await websocket_client.send_json( + { + "id": 1, + "type": "media_player/browse_media", + "entity_id": ENTITY_ID, + } + ) + + msg = await websocket_client.receive_json() + + assert msg["success"] + assert msg["result"]["title"] == "Shelly" + assert msg["result"]["media_class"] == "directory" + assert msg["result"]["media_content_id"] == "" + assert [child["title"] for child in msg["result"]["children"]] == [ + "Radio stations", + "Audio files", + ] + assert [child["media_content_type"] for child in msg["result"]["children"]] == [ + CONTENT_TYPE_RADIO, + CONTENT_TYPE_AUDIO, + ] + + +async def test_rpc_media_player_browse_media_radio_stations( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test Shelly media player browse media radio stations.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_RADIO_STATION + monkeypatch.setattr(mock_rpc_device, "status", status) + mock_rpc_device.media_list_radio_stations.return_value = RADIO_STATIONS + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + websocket_client = await hass_ws_client(hass) + await websocket_client.send_json( + { + "id": 1, + "type": "media_player/browse_media", + "entity_id": ENTITY_ID, + "media_content_type": CONTENT_TYPE_RADIO, + "media_content_id": CONTENT_TYPE_RADIO, + } + ) + + msg = await websocket_client.receive_json() + + assert msg["success"] + assert msg["result"]["title"] == "Radio stations" + assert msg["result"]["media_class"] == "directory" + assert msg["result"]["media_content_type"] == CONTENT_TYPE_RADIO + assert [child["title"] for child in msg["result"]["children"]] == [ + station["name"] for station in RADIO_STATIONS + ] + assert [child["media_content_id"] for child in msg["result"]["children"]] == [ + str(station["id"]) for station in RADIO_STATIONS + ] + assert [child["thumbnail"] for child in msg["result"]["children"]] == [ + station["icon"] for station in RADIO_STATIONS + ] + + +async def test_rpc_media_player_browse_media_audio_files( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test Shelly media player browse media audio files.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + mock_rpc_device.media_list_media.return_value = AUDIO_FILES + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + websocket_client = await hass_ws_client(hass) + await websocket_client.send_json( + { + "id": 1, + "type": "media_player/browse_media", + "entity_id": ENTITY_ID, + "media_content_type": CONTENT_TYPE_AUDIO, + "media_content_id": CONTENT_TYPE_AUDIO, + } + ) + + msg = await websocket_client.receive_json() + + assert msg["success"] + assert msg["result"]["title"] == "Audio files" + assert msg["result"]["media_class"] == "directory" + assert msg["result"]["media_content_type"] == CONTENT_TYPE_AUDIO + assert [child["title"] for child in msg["result"]["children"]] == [ + item["title"] for item in AUDIO_FILES if item["type"] == "AUDIO" + ] + assert [child["media_content_id"] for child in msg["result"]["children"]] == [ + str(item["id"]) for item in AUDIO_FILES if item["type"] == "AUDIO" + ] + assert [child["thumbnail"] for child in msg["result"]["children"]] == [ + item["preview"] for item in AUDIO_FILES if item["type"] == "AUDIO" + ] + + +async def test_rpc_media_player_browse_media_unsupported_media_type( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test Shelly media player browse media returns unsupported media content type.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + mock_rpc_device.media_list_media.return_value = AUDIO_FILES + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + websocket_client = await hass_ws_client(hass) + await websocket_client.send_json( + { + "id": 1, + "type": "media_player/browse_media", + "entity_id": ENTITY_ID, + "media_content_type": "invalid", + "media_content_id": CONTENT_TYPE_AUDIO, + } + ) + + msg = await websocket_client.receive_json() + + assert msg["error"] + assert msg["error"]["code"] == "home_assistant_error" + assert msg["error"]["message"] == ( + "Unsupported media content type for Shelly device: invalid" + ) + + +@pytest.mark.parametrize( + ("side_effect", "expected_message"), + [ + ( + DeviceConnectionError, + "Device communication error occurred while calling action for media_player.test_name of Test name", + ), + ( + RpcCallError(999), + "RPC call error occurred while calling action for media_player.test_name of Test name", + ), + ( + InvalidAuthError, + "Authentication failed for Test name, please update your credentials", + ), + ], +) +async def test_rpc_media_player_browse_media_errors( + hass: HomeAssistant, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + hass_ws_client: WebSocketGenerator, + side_effect: Exception, + expected_message: str, +) -> None: + """Test Shelly media player browse media returns errors.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + monkeypatch.setattr(mock_rpc_device, "status", status) + mock_rpc_device.media_list_media.side_effect = side_effect + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + websocket_client = await hass_ws_client(hass) + await websocket_client.send_json( + { + "id": 1, + "type": "media_player/browse_media", + "entity_id": ENTITY_ID, + "media_content_type": CONTENT_TYPE_AUDIO, + "media_content_id": CONTENT_TYPE_AUDIO, + } + ) + + msg = await websocket_client.receive_json() + + assert msg["error"] + assert msg["error"]["code"] == "home_assistant_error" + assert msg["error"]["message"] == expected_message + + +async def test_rpc_media_player_no_media_meta( + hass: HomeAssistant, + entity_registry: EntityRegistry, + mock_rpc_device: Mock, + monkeypatch: pytest.MonkeyPatch, + snapshot: SnapshotAssertion, +) -> None: + """Test a Shelly RPC media player with no media metadata.""" + status = deepcopy(mock_rpc_device.status) + status["media"] = STATUS_AUDIO_FILE + status["media"]["playback"].pop("media_meta") + monkeypatch.setattr(mock_rpc_device, "status", status) + + await init_integration(hass, 2, model=MODEL_WALL_DISPLAY) + + assert (state := hass.states.get(ENTITY_ID)) + assert state.state == STATE_PLAYING + assert state.attributes.get(ATTR_MEDIA_TITLE) is None + assert state.attributes.get(ATTR_MEDIA_ARTIST) is None + assert state.attributes.get(ATTR_MEDIA_ALBUM_NAME) is None + assert state.attributes.get(ATTR_MEDIA_DURATION) is None + assert state.attributes.get(ATTR_MEDIA_POSITION) is None