Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions homeassistant/components/cast/media_player.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from collections.abc import Callable
from contextlib import suppress
from datetime import datetime
from functools import wraps
import json
import logging
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar

import pychromecast
from pychromecast.controllers.homeassistant import HomeAssistantController
Expand All @@ -19,6 +20,7 @@
)
from pychromecast.controllers.multizone import MultizoneManager
from pychromecast.controllers.receiver import VOLUME_CONTROL_TYPE_FIXED
from pychromecast.error import PyChromecastError
from pychromecast.quick_play import quick_play
from pychromecast.socket_client import (
CONNECTION_STATUS_CONNECTED,
Expand Down Expand Up @@ -84,6 +86,34 @@
CAST_SPLASH = "https://www.home-assistant.io/images/cast/splash.png"


_CastDeviceT = TypeVar("_CastDeviceT", bound="CastDevice")
_R = TypeVar("_R")
_P = ParamSpec("_P")

_FuncType = Callable[Concatenate[_CastDeviceT, _P], _R]
_ReturnFuncType = Callable[Concatenate[_CastDeviceT, _P], _R]


def api_error(
func: _FuncType[_CastDeviceT, _P, _R],
) -> _ReturnFuncType[_CastDeviceT, _P, _R]:
"""Handle PyChromecastError and reraise a HomeAssistantError."""

@wraps(func)
def wrapper(self: _CastDeviceT, *args: _P.args, **kwargs: _P.kwargs) -> _R:
"""Wrap a CastDevice method."""
try:
return_value = func(self, *args, **kwargs)
except PyChromecastError as err:
raise HomeAssistantError(
f"{self.__class__.__name__}.{func.__name__} Failed: {err}"
) from err

return return_value

return wrapper


@callback
def _async_create_cast_device(hass: HomeAssistant, info: ChromecastInfo):
"""Create a CastDevice entity or dynamic group from the chromecast object.
Expand Down Expand Up @@ -478,6 +508,21 @@ def _media_controller(self):

return media_controller

@api_error
def _quick_play(self, app_name: str, data: dict[str, Any]) -> None:
"""Launch the app `app_name` and start playing media defined by `data`."""
quick_play(self._get_chromecast(), app_name, data)

@api_error
def _quit_app(self) -> None:
"""Quit the currently running app."""
self._get_chromecast().quit_app()

@api_error
def _start_app(self, app_id: str) -> None:
"""Start an app."""
self._get_chromecast().start_app(app_id)

def turn_on(self) -> None:
"""Turn on the cast device."""

Expand All @@ -488,52 +533,61 @@ def turn_on(self) -> None:

if chromecast.app_id is not None:
# Quit the previous app before starting splash screen or media player
chromecast.quit_app()
self._quit_app()

# The only way we can turn the Chromecast is on is by launching an app
if chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST:
app_data = {"media_id": CAST_SPLASH, "media_type": "image/png"}
quick_play(chromecast, "default_media_receiver", app_data)
self._quick_play("default_media_receiver", app_data)
else:
chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER)
self._start_app(pychromecast.config.APP_MEDIA_RECEIVER)

@api_error
def turn_off(self) -> None:
"""Turn off the cast device."""
self._get_chromecast().quit_app()

@api_error
def mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
self._get_chromecast().set_volume_muted(mute)

@api_error
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
self._get_chromecast().set_volume(volume)

@api_error
def media_play(self) -> None:
"""Send play command."""
media_controller = self._media_controller()
media_controller.play()

@api_error
def media_pause(self) -> None:
"""Send pause command."""
media_controller = self._media_controller()
media_controller.pause()

@api_error
def media_stop(self) -> None:
"""Send stop command."""
media_controller = self._media_controller()
media_controller.stop()

@api_error
def media_previous_track(self) -> None:
"""Send previous track command."""
media_controller = self._media_controller()
media_controller.queue_prev()

@api_error
def media_next_track(self) -> None:
"""Send next track command."""
media_controller = self._media_controller()
media_controller.queue_next()

@api_error
def media_seek(self, position: float) -> None:
"""Seek the media to a specific location."""
media_controller = self._media_controller()
Expand Down Expand Up @@ -646,7 +700,7 @@ async def async_play_media(
if "app_id" in app_data:
app_id = app_data.pop("app_id")
_LOGGER.info("Starting Cast app by ID %s", app_id)
await self.hass.async_add_executor_job(chromecast.start_app, app_id)
await self.hass.async_add_executor_job(self._start_app, app_id)
if app_data:
_LOGGER.warning(
"Extra keys %s were ignored. Please use app_name to cast media",
Expand All @@ -657,7 +711,7 @@ async def async_play_media(
app_name = app_data.pop("app_name")
try:
await self.hass.async_add_executor_job(
quick_play, chromecast, app_name, app_data
self._quick_play, app_name, app_data
)
except NotImplementedError:
_LOGGER.error("App %s not supported", app_name)
Expand Down Expand Up @@ -727,7 +781,7 @@ async def async_play_media(
app_data,
)
await self.hass.async_add_executor_job(
quick_play, chromecast, "default_media_receiver", app_data
self._quick_play, "default_media_receiver", app_data
)

def _media_status(self):
Expand Down