diff --git a/.coveragerc b/.coveragerc index b5939bafec6a8c..47e3cfc7c3bb55 100644 --- a/.coveragerc +++ b/.coveragerc @@ -314,6 +314,7 @@ omit = homeassistant/components/iaqualink/light.py homeassistant/components/iaqualink/sensor.py homeassistant/components/iaqualink/switch.py + homeassistant/components/icloud/__init__.py homeassistant/components/icloud/device_tracker.py homeassistant/components/izone/climate.py homeassistant/components/izone/discovery.py diff --git a/CODEOWNERS b/CODEOWNERS index ee6a8cd169cb62..47b9e48b9e96cb 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -144,6 +144,7 @@ homeassistant/components/huawei_lte/* @scop homeassistant/components/huawei_router/* @abmantis homeassistant/components/hue/* @balloob homeassistant/components/iaqualink/* @flz +homeassistant/components/icloud/* @Quentame homeassistant/components/ign_sismologia/* @exxamalte homeassistant/components/incomfort/* @zxdavb homeassistant/components/influxdb/* @fabaff diff --git a/homeassistant/components/icloud/.translations/en.json b/homeassistant/components/icloud/.translations/en.json new file mode 100644 index 00000000000000..581017593566ca --- /dev/null +++ b/homeassistant/components/icloud/.translations/en.json @@ -0,0 +1,38 @@ +{ + "config": { + "abort": { + "username_exists": "Account already configured" + }, + "error": { + "login": "Login error: please check your email & password", + "send_verification_code": "Failed to send verification code", + "username_exists": "Account already configured", + "validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again" + }, + "step": { + "trusted_device": { + "data": { + "trusted_device": "Trusted device" + }, + "description": "Select your trusted device", + "title": "iCloud trusted device" + }, + "user": { + "data": { + "password": "Password", + "username": "Email" + }, + "description": "Enter your credentials", + "title": "iCloud credentials" + }, + "verification_code": { + "data": { + "verification_code": "Verification code" + }, + "description": "Please enter the verification code you just received from iCloud", + "title": "iCloud verification code" + } + }, + "title": "Apple iCloud" + } +} \ No newline at end of file diff --git a/homeassistant/components/icloud/__init__.py b/homeassistant/components/icloud/__init__.py index 1169104c99d9a3..6c4d6d9bdcc747 100644 --- a/homeassistant/components/icloud/__init__.py +++ b/homeassistant/components/icloud/__init__.py @@ -1 +1,607 @@ -"""The icloud component.""" +"""The iCloud component.""" +from datetime import timedelta +import logging +import operator +from typing import Dict + +from pyicloud import PyiCloudService +from pyicloud.exceptions import PyiCloudFailedLoginException, PyiCloudNoDevicesException +from pyicloud.services.findmyiphone import AppleDevice +import voluptuous as vol + +from homeassistant.components.zone import async_active_zone +from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry +from homeassistant.const import ATTR_ATTRIBUTION, CONF_PASSWORD, CONF_USERNAME +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.dispatcher import dispatcher_send +from homeassistant.helpers.event import track_point_in_utc_time +from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType +from homeassistant.util import slugify +from homeassistant.util.async_ import run_callback_threadsafe +from homeassistant.util.dt import utcnow +from homeassistant.util.location import distance + +from .const import ( + CONF_ACCOUNT_NAME, + CONF_GPS_ACCURACY_THRESHOLD, + CONF_MAX_INTERVAL, + DEFAULT_GPS_ACCURACY_THRESHOLD, + DEFAULT_MAX_INTERVAL, + DOMAIN, + ICLOUD_COMPONENTS, + TRACKER_UPDATE, + DEVICE_BATTERY_LEVEL, + DEVICE_BATTERY_STATUS, + DEVICE_CLASS, + DEVICE_DISPLAY_NAME, + DEVICE_ID, + DEVICE_LOCATION, + DEVICE_LOCATION_LATITUDE, + DEVICE_LOCATION_LONGITUDE, + DEVICE_LOST_MODE_CAPABLE, + DEVICE_LOW_POWER_MODE, + DEVICE_NAME, + DEVICE_PERSON_ID, + DEVICE_RAW_DEVICE_MODEL, + DEVICE_STATUS, + DEVICE_STATUS_SET, + DEVICE_STATUS_CODES, +) + +ATTRIBUTION = "Data provided by Apple iCloud" + +# entity attributes +ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval" +ATTR_BATTERY = "battery" +ATTR_BATTERY_STATUS = "battery_status" +ATTR_DEVICE_NAME = "device_name" +ATTR_DEVICE_STATUS = "device_status" +ATTR_LOW_POWER_MODE = "low_power_mode" +ATTR_OWNER_NAME = "owner_fullname" + +# services +SERVICE_ICLOUD_PLAY_SOUND = "play_sound" +SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message" +SERVICE_ICLOUD_LOST_DEVICE = "lost_device" +SERVICE_ICLOUD_UPDATE = "update" +SERVICE_ICLOUD_RESET = "reset" +ATTR_ACCOUNT = "account" +ATTR_LOST_DEVICE_MESSAGE = "message" +ATTR_LOST_DEVICE_NUMBER = "number" +ATTR_LOST_DEVICE_SOUND = "sound" + +SERVICE_SCHEMA = vol.Schema({vol.Optional(ATTR_ACCOUNT): cv.string}) + +SERVICE_SCHEMA_PLAY_SOUND = vol.Schema( + {vol.Required(ATTR_ACCOUNT): cv.string, vol.Required(ATTR_DEVICE_NAME): cv.string} +) + +SERVICE_SCHEMA_DISPLAY_MESSAGE = vol.Schema( + { + vol.Required(ATTR_ACCOUNT): cv.string, + vol.Required(ATTR_DEVICE_NAME): cv.string, + vol.Required(ATTR_LOST_DEVICE_MESSAGE): cv.string, + vol.Optional(ATTR_LOST_DEVICE_SOUND): cv.boolean, + } +) + +SERVICE_SCHEMA_LOST_DEVICE = vol.Schema( + { + vol.Required(ATTR_ACCOUNT): cv.string, + vol.Required(ATTR_DEVICE_NAME): cv.string, + vol.Required(ATTR_LOST_DEVICE_NUMBER): cv.string, + vol.Required(ATTR_LOST_DEVICE_MESSAGE): cv.string, + } +) + +ACCOUNT_SCHEMA = vol.Schema( + { + vol.Required(CONF_USERNAME): cv.string, + vol.Required(CONF_PASSWORD): cv.string, + vol.Optional(CONF_ACCOUNT_NAME): cv.string, + vol.Optional(CONF_MAX_INTERVAL, default=DEFAULT_MAX_INTERVAL): cv.positive_int, + vol.Optional( + CONF_GPS_ACCURACY_THRESHOLD, default=DEFAULT_GPS_ACCURACY_THRESHOLD + ): cv.positive_int, + } +) + +CONFIG_SCHEMA = vol.Schema( + {DOMAIN: vol.Schema(vol.All(cv.ensure_list, [ACCOUNT_SCHEMA]))}, + extra=vol.ALLOW_EXTRA, +) + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool: + """Set up iCloud from legacy config file.""" + + conf = config.get(DOMAIN) + if conf is None: + return True + + for account_conf in conf: + hass.async_create_task( + hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_IMPORT}, data=account_conf.copy() + ) + ) + + return True + + +async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool: + """Set up an iCloud account from a config entry.""" + + hass.data.setdefault(DOMAIN, {}) + + username = entry.data[CONF_USERNAME] + password = entry.data[CONF_PASSWORD] + account_name = entry.data.get(CONF_ACCOUNT_NAME) + max_interval = entry.data[CONF_MAX_INTERVAL] + gps_accuracy_threshold = entry.data[CONF_GPS_ACCURACY_THRESHOLD] + + account = IcloudAccount( + hass, username, password, account_name, max_interval, gps_accuracy_threshold + ) + await hass.async_add_executor_job(account.reset_account) + hass.data[DOMAIN][username] = account + + for component in ICLOUD_COMPONENTS: + hass.async_create_task( + hass.config_entries.async_forward_entry_setup(entry, component) + ) + + def play_sound(service: ServiceDataType) -> None: + """Play sound on the device.""" + account = service.data[ATTR_ACCOUNT] + device_name = service.data.get(ATTR_DEVICE_NAME) + device_name = slugify(device_name.replace(" ", "", 99)) + + for device in _get_account(account).get_devices_with_name(device_name): + device.play_sound() + + def display_message(service: ServiceDataType) -> None: + """Display a message on the device.""" + account = service.data[ATTR_ACCOUNT] + device_name = service.data.get(ATTR_DEVICE_NAME) + device_name = slugify(device_name.replace(" ", "", 99)) + message = service.data.get(ATTR_LOST_DEVICE_MESSAGE) + sound = service.data.get(ATTR_LOST_DEVICE_SOUND, False) + + for device in _get_account(account).get_devices_with_name(device_name): + device.display_message(message, sound) + + def lost_device(service: ServiceDataType) -> None: + """Make the device in lost state.""" + account = service.data[ATTR_ACCOUNT] + device_name = service.data.get(ATTR_DEVICE_NAME) + device_name = slugify(device_name.replace(" ", "", 99)) + number = service.data.get(ATTR_LOST_DEVICE_NUMBER) + message = service.data.get(ATTR_LOST_DEVICE_MESSAGE) + + for device in _get_account(account).get_devices_with_name(device_name): + device.lost_device(number, message) + + def update_account(service: ServiceDataType) -> None: + """Call the update function of an iCloud account.""" + account = service.data.get(ATTR_ACCOUNT) + + if account is None: + for account in hass.data[DOMAIN].values(): + account.keep_alive() + else: + _get_account(account).keep_alive() + + def reset_account(service: ServiceDataType) -> None: + """Reset an iCloud account.""" + account = service.data.get(ATTR_ACCOUNT) + + if account is None: + for account in hass.data[DOMAIN].values(): + account.reset_account() + else: + _get_account(account).reset_account() + + def _get_account(account_identifier: str) -> any: + if account_identifier is None: + return None + + icloud_account = hass.data[DOMAIN].get(account_identifier, None) + if icloud_account is None: + for account in hass.data[DOMAIN].values(): + if account.name == account_identifier: + icloud_account = account + + if icloud_account is None: + raise Exception( + "No iCloud account with username or name " + account_identifier + ) + return icloud_account + + hass.services.async_register( + DOMAIN, SERVICE_ICLOUD_PLAY_SOUND, play_sound, schema=SERVICE_SCHEMA_PLAY_SOUND + ) + + hass.services.async_register( + DOMAIN, + SERVICE_ICLOUD_DISPLAY_MESSAGE, + display_message, + schema=SERVICE_SCHEMA_DISPLAY_MESSAGE, + ) + + hass.services.async_register( + DOMAIN, + SERVICE_ICLOUD_LOST_DEVICE, + lost_device, + schema=SERVICE_SCHEMA_LOST_DEVICE, + ) + + hass.services.async_register( + DOMAIN, SERVICE_ICLOUD_UPDATE, update_account, schema=SERVICE_SCHEMA + ) + + hass.services.async_register( + DOMAIN, SERVICE_ICLOUD_RESET, reset_account, schema=SERVICE_SCHEMA + ) + + return True + + +class IcloudAccount: + """Representation of an iCloud account.""" + + def __init__( + self, + hass: HomeAssistantType, + username: str, + password: str, + account_name: str, + max_interval: int, + gps_accuracy_threshold: int, + ): + """Initialize an iCloud account.""" + self.hass = hass + self._username = username + self._password = password + self._name = account_name or slugify(username.partition("@")[0]) + self._fetch_interval = max_interval + self._max_interval = max_interval + self._gps_accuracy_threshold = gps_accuracy_threshold + + self.api = None + self._owner_fullname = None + self._family_members_fullname = {} + self._devices = {} + + self.unsub_device_tracker = None + + def reset_account(self): + """Reset an iCloud account.""" + icloud_dir = self.hass.config.path("icloud") + + try: + self.api = PyiCloudService(self._username, self._password, icloud_dir) + except PyiCloudFailedLoginException as error: + self.api = None + _LOGGER.error("Error logging into iCloud Service: %s", error) + return + + user_info = None + try: + # Gets device owners infos + user_info = self.api.devices.response["userInfo"] + except PyiCloudNoDevicesException: + _LOGGER.error("No iCloud Devices found") + + self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}" + + self._family_members_fullname = {} + for prs_id, member in user_info["membersInfo"].items(): + self._family_members_fullname[ + prs_id + ] = f"{member['firstName']} {member['lastName']}" + + self._devices = {} + self.update_devices() + + def update_devices(self) -> None: + """Update iCloud devices.""" + if self.api is None: + return + + api_devices = {} + try: + api_devices = self.api.devices + except PyiCloudNoDevicesException: + _LOGGER.error("No iCloud Devices found") + + # Gets devices infos + for device in api_devices: + status = device.status(DEVICE_STATUS_SET) + device_id = status[DEVICE_ID] + device_name = status[DEVICE_NAME] + + if self._devices.get(device_id, None) is not None: + # Seen device -> updating + _LOGGER.debug("Updating iCloud device: %s", device_name) + self._devices[device_id].update(status) + else: + # New device, should be unique + _LOGGER.debug( + "Adding iCloud device: %s [model: %s]", + device_name, + status[DEVICE_RAW_DEVICE_MODEL], + ) + self._devices[device_id] = IcloudDevice(self, device, status) + self._devices[device_id].update(status) + + dispatcher_send(self.hass, TRACKER_UPDATE) + self._fetch_interval = self._determine_interval() + track_point_in_utc_time( + self.hass, + self.keep_alive, + utcnow() + timedelta(minutes=self._fetch_interval), + ) + + def _determine_interval(self) -> int: + """Calculate new interval between two API fetch (in minutes).""" + intervals = {} + for device in self._devices.values(): + if device.location is None: + continue + + current_zone = run_callback_threadsafe( + self.hass.loop, + async_active_zone, + self.hass, + device.location[DEVICE_LOCATION_LATITUDE], + device.location[DEVICE_LOCATION_LONGITUDE], + ).result() + + if current_zone is not None: + intervals[device.name] = self._max_interval + continue + + zones = ( + self.hass.states.get(entity_id) + for entity_id in sorted(self.hass.states.entity_ids("zone")) + ) + + distances = [] + for zone_state in zones: + zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE] + zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE] + zone_distance = distance( + device.location[DEVICE_LOCATION_LATITUDE], + device.location[DEVICE_LOCATION_LONGITUDE], + zone_state_lat, + zone_state_long, + ) + distances.append(round(zone_distance / 1000, 1)) + + if not distances: + continue + mindistance = min(distances) + + # Calculate out how long it would take for the device to drive + # to the nearest zone at 120 km/h: + interval = round(mindistance / 2, 0) + + # Never poll more than once per minute + interval = max(interval, 1) + + if interval > 180: + # Three hour drive? + # This is far enough that they might be flying + interval = self._max_interval + + if ( + device.battery_level is not None + and device.battery_level <= 33 + and mindistance > 3 + ): + # Low battery - let's check half as often + interval = interval * 2 + + intervals[device.name] = interval + + return max( + int(min(intervals.items(), key=operator.itemgetter(1))[1]), + self._max_interval, + ) + + def keep_alive(self, now=None) -> None: + """Keep the API alive.""" + if self.api is None: + self.reset_account() + + if self.api is None: + return + + self.api.authenticate() + self.update_devices() + + def get_devices_with_name(self, name: str) -> [any]: + """Get devices by name.""" + result = [] + name_slug = slugify(name.replace(" ", "", 99)) + for device in self.devices.values(): + if slugify(device.name.replace(" ", "", 99)) == name_slug: + result.append(device) + if not result: + raise Exception("No device with name " + name) + return result + + @property + def name(self) -> str: + """Return the account name.""" + return self._name + + @property + def username(self) -> str: + """Return the account username.""" + return self._username + + @property + def owner_fullname(self) -> str: + """Return the account owner fullname.""" + return self._owner_fullname + + @property + def family_members_fullname(self) -> Dict[str, str]: + """Return the account family members fullname.""" + return self._family_members_fullname + + @property + def fetch_interval(self) -> int: + """Return the account fetch interval.""" + return self._fetch_interval + + @property + def devices(self) -> Dict[str, any]: + """Return the account devices.""" + return self._devices + + +class IcloudDevice: + """Representation of a iCloud device.""" + + def __init__(self, account: IcloudAccount, device: AppleDevice, status): + """Initialize the iCloud device.""" + self._account = account + account_name = account.name + + self._device = device + self._status = status + + self._name = self._status[DEVICE_NAME] + self._device_id = self._status[DEVICE_ID] + self._device_class = self._status[DEVICE_CLASS] + self._device_model = self._status[DEVICE_DISPLAY_NAME] + + if self._status[DEVICE_PERSON_ID]: + owner_fullname = account.family_members_fullname[ + self._status[DEVICE_PERSON_ID] + ] + else: + owner_fullname = account.owner_fullname + + self._battery_level = None + self._battery_status = None + self._location = None + + self._attrs = { + ATTR_ATTRIBUTION: ATTRIBUTION, + CONF_ACCOUNT_NAME: account_name, + ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval, + ATTR_DEVICE_NAME: self._device_model, + ATTR_DEVICE_STATUS: None, + ATTR_OWNER_NAME: owner_fullname, + } + + def update(self, status) -> None: + """Update the iCloud device.""" + self._status = status + + self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval + + device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error") + self._attrs[ATTR_DEVICE_STATUS] = device_status + + if self._status[DEVICE_BATTERY_STATUS] != "Unknown": + self._battery_level = int(self._status.get(DEVICE_BATTERY_LEVEL, 0) * 100) + self._battery_status = self._status[DEVICE_BATTERY_STATUS] + low_power_mode = self._status[DEVICE_LOW_POWER_MODE] + + self._attrs[ATTR_BATTERY] = self._battery_level + self._attrs[ATTR_BATTERY_STATUS] = self._battery_status + self._attrs[ATTR_LOW_POWER_MODE] = low_power_mode + + if ( + self._status[DEVICE_LOCATION] + and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE] + ): + location = self._status[DEVICE_LOCATION] + self._location = location + + def play_sound(self) -> None: + """Play sound on the device.""" + if self._account.api is None: + return + + self._account.api.authenticate() + _LOGGER.debug("Playing sound for %s", self.name) + self.device.play_sound() + + def display_message(self, message: str, sound: bool = False) -> None: + """Display a message on the device.""" + if self._account.api is None: + return + + self._account.api.authenticate() + _LOGGER.debug("Displaying message for %s", self.name) + self.device.display_message("Subject not working", message, sound) + + def lost_device(self, number: str, message: str) -> None: + """Make the device in lost state.""" + if self._account.api is None: + return + + self._account.api.authenticate() + if self._status[DEVICE_LOST_MODE_CAPABLE]: + _LOGGER.debug("Make device lost for %s", self.name) + self.device.lost_device(number, message, None) + else: + _LOGGER.error("Cannot make device lost for %s", self.name) + + @property + def unique_id(self) -> str: + """Return a unique ID.""" + return self._device_id + + @property + def dev_id(self) -> str: + """Return the device ID.""" + return self._device_id + + @property + def name(self) -> str: + """Return the Apple device name.""" + return self._name + + @property + def device(self) -> AppleDevice: + """Return the Apple device.""" + return self._device + + @property + def device_class(self) -> str: + """Return the Apple device class.""" + return self._device_class + + @property + def device_model(self) -> str: + """Return the Apple device model.""" + return self._device_model + + @property + def battery_level(self) -> int: + """Return the Apple device battery level.""" + return self._battery_level + + @property + def battery_status(self) -> str: + """Return the Apple device battery status.""" + return self._battery_status + + @property + def location(self) -> Dict[str, any]: + """Return the Apple device location.""" + return self._location + + @property + def state_attributes(self) -> Dict[str, any]: + """Return the attributes.""" + return self._attrs diff --git a/homeassistant/components/icloud/config_flow.py b/homeassistant/components/icloud/config_flow.py new file mode 100644 index 00000000000000..cb871ddd3724bd --- /dev/null +++ b/homeassistant/components/icloud/config_flow.py @@ -0,0 +1,224 @@ +"""Config flow to configure the iCloud integration.""" +import logging +import os + +import voluptuous as vol +from pyicloud import PyiCloudService +from pyicloud.exceptions import PyiCloudException, PyiCloudFailedLoginException + +from homeassistant import config_entries +from homeassistant.const import CONF_PASSWORD, CONF_USERNAME +from homeassistant.util import slugify + +# pylint: disable=unused-import +from .const import ( + CONF_ACCOUNT_NAME, + CONF_GPS_ACCURACY_THRESHOLD, + CONF_MAX_INTERVAL, + DEFAULT_MAX_INTERVAL, + DEFAULT_GPS_ACCURACY_THRESHOLD, + DOMAIN, # noqa +) + +CONF_TRUSTED_DEVICE = "trusted_device" +CONF_VERIFICATION_CODE = "verification_code" + +_LOGGER = logging.getLogger(__name__) + + +class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a iCloud config flow.""" + + VERSION = 1 + CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL + + def __init__(self): + """Initialize iCloud config flow.""" + self.api = None + self._username = None + self._password = None + self._account_name = None + self._max_interval = None + self._gps_accuracy_threshold = None + + self._trusted_device = None + self._verification_code = None + + def _configuration_exists(self, username: str, account_name: str) -> bool: + """Return True if username or account_name exists in configuration.""" + for entry in self._async_current_entries(): + if ( + entry.data[CONF_USERNAME] == username + or entry.data.get(CONF_ACCOUNT_NAME) == account_name + or slugify(entry.data[CONF_USERNAME].partition("@")[0]) == account_name + ): + return True + return False + + async def _show_setup_form(self, user_input=None, errors=None): + """Show the setup form to the user.""" + + if user_input is None: + user_input = {} + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema( + { + vol.Required( + CONF_USERNAME, default=user_input.get(CONF_USERNAME, "") + ): str, + vol.Required( + CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "") + ): str, + } + ), + errors=errors or {}, + ) + + async def async_step_user(self, user_input=None): + """Handle a flow initiated by the user.""" + errors = {} + + icloud_dir = self.hass.config.path("icloud") + if not os.path.exists(icloud_dir): + await self.hass.async_add_executor_job(os.makedirs, icloud_dir) + + if user_input is None: + return await self._show_setup_form(user_input, errors) + + self._username = user_input[CONF_USERNAME] + self._password = user_input[CONF_PASSWORD] + self._account_name = user_input.get(CONF_ACCOUNT_NAME) + self._max_interval = user_input.get(CONF_MAX_INTERVAL, DEFAULT_MAX_INTERVAL) + self._gps_accuracy_threshold = user_input.get( + CONF_GPS_ACCURACY_THRESHOLD, DEFAULT_GPS_ACCURACY_THRESHOLD + ) + + if self._configuration_exists(self._username, self._account_name): + errors[CONF_USERNAME] = "username_exists" + return await self._show_setup_form(user_input, errors) + + try: + self.api = await self.hass.async_add_executor_job( + PyiCloudService, self._username, self._password, icloud_dir + ) + except PyiCloudFailedLoginException as error: + _LOGGER.error("Error logging into iCloud service: %s", error) + self.api = None + errors[CONF_USERNAME] = "login" + return await self._show_setup_form(user_input, errors) + + if self.api.requires_2fa: + return await self.async_step_trusted_device() + + return self.async_create_entry( + title=self._username, + data={ + CONF_USERNAME: self._username, + CONF_PASSWORD: self._password, + CONF_ACCOUNT_NAME: self._account_name, + CONF_MAX_INTERVAL: self._max_interval, + CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold, + }, + ) + + async def async_step_import(self, user_input): + """Import a config entry.""" + if self._configuration_exists( + user_input[CONF_USERNAME], user_input.get(CONF_ACCOUNT_NAME) + ): + return self.async_abort(reason="username_exists") + + return await self.async_step_user(user_input) + + async def async_step_trusted_device(self, user_input=None, errors=None): + """We need a trusted device.""" + if errors is None: + errors = {} + + trusted_devices = {} + devices = self.api.trusted_devices + for i, device in enumerate(devices): + trusted_devices[i] = device.get( + "deviceName", f"SMS to {device.get('phoneNumber')}" + ) + + if user_input is None: + return await self._show_trusted_device_form( + trusted_devices, user_input, errors + ) + + self._trusted_device = self.api.trusted_devices[ + int(user_input[CONF_TRUSTED_DEVICE]) + ] + + if not self.api.send_verification_code(self._trusted_device): + _LOGGER.error("Failed to send verification code") + self._trusted_device = None + errors[CONF_TRUSTED_DEVICE] = "send_verification_code" + + return await self._show_trusted_device_form( + trusted_devices, user_input, errors + ) + + return await self.async_step_verification_code() + + async def _show_trusted_device_form( + self, trusted_devices, user_input=None, errors=None + ): + """Show the trusted_device form to the user.""" + + return self.async_show_form( + step_id=CONF_TRUSTED_DEVICE, + data_schema=vol.Schema( + { + vol.Required(CONF_TRUSTED_DEVICE): vol.All( + vol.Coerce(int), vol.In(trusted_devices) + ) + } + ), + errors=errors or {}, + ) + + async def async_step_verification_code(self, user_input=None): + """Ask the verification code to the user.""" + errors = {} + + if user_input is None: + return await self._show_verification_code_form(user_input) + + self._verification_code = user_input[CONF_VERIFICATION_CODE] + + try: + if not self.api.validate_verification_code( + self._trusted_device, self._verification_code + ): + raise PyiCloudException("The code you entered is not valid.") + except PyiCloudException as error: + # Reset to the initial 2FA state to allow the user to retry + _LOGGER.error("Failed to verify verification code: %s", error) + self._trusted_device = None + self._verification_code = None + errors["base"] = "validate_verification_code" + + return await self.async_step_trusted_device(None, errors) + + return await self.async_step_user( + { + CONF_USERNAME: self._username, + CONF_PASSWORD: self._password, + CONF_ACCOUNT_NAME: self._account_name, + CONF_MAX_INTERVAL: self._max_interval, + CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold, + } + ) + + async def _show_verification_code_form(self, user_input=None): + """Show the verification_code form to the user.""" + + return self.async_show_form( + step_id=CONF_VERIFICATION_CODE, + data_schema=vol.Schema({vol.Required(CONF_VERIFICATION_CODE): str}), + errors=None, + ) diff --git a/homeassistant/components/icloud/const.py b/homeassistant/components/icloud/const.py new file mode 100644 index 00000000000000..1ecf5c124cefbc --- /dev/null +++ b/homeassistant/components/icloud/const.py @@ -0,0 +1,81 @@ +"""iCloud component constants.""" + +DOMAIN = "icloud" +TRACKER_UPDATE = f"{DOMAIN}_tracker_update" + +CONF_ACCOUNT_NAME = "account_name" +CONF_MAX_INTERVAL = "max_interval" +CONF_GPS_ACCURACY_THRESHOLD = "gps_accuracy_threshold" + +DEFAULT_MAX_INTERVAL = 30 # min +DEFAULT_GPS_ACCURACY_THRESHOLD = 500 # meters + +# Next PR will add sensor +ICLOUD_COMPONENTS = ["device_tracker"] + +# pyicloud.AppleDevice status +DEVICE_BATTERY_LEVEL = "batteryLevel" +DEVICE_BATTERY_STATUS = "batteryStatus" +DEVICE_CLASS = "deviceClass" +DEVICE_DISPLAY_NAME = "deviceDisplayName" +DEVICE_ID = "id" +DEVICE_LOCATION = "location" +DEVICE_LOCATION_HORIZONTAL_ACCURACY = "horizontalAccuracy" +DEVICE_LOCATION_LATITUDE = "latitude" +DEVICE_LOCATION_LONGITUDE = "longitude" +DEVICE_LOST_MODE_CAPABLE = "lostModeCapable" +DEVICE_LOW_POWER_MODE = "lowPowerMode" +DEVICE_NAME = "name" +DEVICE_PERSON_ID = "prsId" +DEVICE_RAW_DEVICE_MODEL = "rawDeviceModel" +DEVICE_STATUS = "deviceStatus" + +DEVICE_STATUS_SET = [ + "features", + "maxMsgChar", + "darkWake", + "fmlyShare", + DEVICE_STATUS, + "remoteLock", + "activationLocked", + DEVICE_CLASS, + DEVICE_ID, + "deviceModel", + DEVICE_RAW_DEVICE_MODEL, + "passcodeLength", + "canWipeAfterLock", + "trackingInfo", + DEVICE_LOCATION, + "msg", + DEVICE_BATTERY_LEVEL, + "remoteWipe", + "thisDevice", + "snd", + DEVICE_PERSON_ID, + "wipeInProgress", + DEVICE_LOW_POWER_MODE, + "lostModeEnabled", + "isLocating", + DEVICE_LOST_MODE_CAPABLE, + "mesg", + DEVICE_NAME, + DEVICE_BATTERY_STATUS, + "lockedTimestamp", + "lostTimestamp", + "locationCapable", + DEVICE_DISPLAY_NAME, + "lostDevice", + "deviceColor", + "wipedTimestamp", + "modelDisplayName", + "locationEnabled", + "isMac", + "locFoundEnabled", +] + +DEVICE_STATUS_CODES = { + "200": "online", + "201": "offline", + "203": "pending", + "204": "unregistered", +} diff --git a/homeassistant/components/icloud/device_tracker.py b/homeassistant/components/icloud/device_tracker.py index 2ecf904314fe4b..bb5d23fd6b5e49 100644 --- a/homeassistant/components/icloud/device_tracker.py +++ b/homeassistant/components/icloud/device_tracker.py @@ -1,547 +1,133 @@ -"""Platform that supports scanning iCloud.""" +"""Support for tracking for iCloud devices.""" import logging -import random -import os -import voluptuous as vol +from homeassistant.components.device_tracker import SOURCE_TYPE_GPS +from homeassistant.const import CONF_USERNAME +from homeassistant.components.device_tracker.config_entry import TrackerEntity +from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.typing import HomeAssistantType -from homeassistant.const import CONF_USERNAME, CONF_PASSWORD -from homeassistant.components.device_tracker import PLATFORM_SCHEMA -from homeassistant.components.device_tracker.const import ( + +from . import IcloudDevice +from .const import ( DOMAIN, - ATTR_ATTRIBUTES, - ENTITY_ID_FORMAT, + TRACKER_UPDATE, + DEVICE_LOCATION_HORIZONTAL_ACCURACY, + DEVICE_LOCATION_LATITUDE, + DEVICE_LOCATION_LONGITUDE, ) -from homeassistant.components.device_tracker.legacy import DeviceScanner -from homeassistant.components.zone import async_active_zone -from homeassistant.helpers.event import track_utc_time_change -import homeassistant.helpers.config_validation as cv -from homeassistant.util import slugify -import homeassistant.util.dt as dt_util -from homeassistant.util.location import distance -from homeassistant.util.async_ import run_callback_threadsafe _LOGGER = logging.getLogger(__name__) -CONF_ACCOUNTNAME = "account_name" -CONF_MAX_INTERVAL = "max_interval" -CONF_GPS_ACCURACY_THRESHOLD = "gps_accuracy_threshold" - -# entity attributes -ATTR_ACCOUNTNAME = "account_name" -ATTR_INTERVAL = "interval" -ATTR_DEVICENAME = "device_name" -ATTR_BATTERY = "battery" -ATTR_DISTANCE = "distance" -ATTR_DEVICESTATUS = "device_status" -ATTR_LOWPOWERMODE = "low_power_mode" -ATTR_BATTERYSTATUS = "battery_status" - -ICLOUDTRACKERS = {} - -_CONFIGURING = {} - -DEVICESTATUSSET = [ - "features", - "maxMsgChar", - "darkWake", - "fmlyShare", - "deviceStatus", - "remoteLock", - "activationLocked", - "deviceClass", - "id", - "deviceModel", - "rawDeviceModel", - "passcodeLength", - "canWipeAfterLock", - "trackingInfo", - "location", - "msg", - "batteryLevel", - "remoteWipe", - "thisDevice", - "snd", - "prsId", - "wipeInProgress", - "lowPowerMode", - "lostModeEnabled", - "isLocating", - "lostModeCapable", - "mesg", - "name", - "batteryStatus", - "lockedTimestamp", - "lostTimestamp", - "locationCapable", - "deviceDisplayName", - "lostDevice", - "deviceColor", - "wipedTimestamp", - "modelDisplayName", - "locationEnabled", - "isMac", - "locFoundEnabled", -] - -DEVICESTATUSCODES = { - "200": "online", - "201": "offline", - "203": "pending", - "204": "unregistered", -} - -SERVICE_SCHEMA = vol.Schema( - { - vol.Optional(ATTR_ACCOUNTNAME): vol.All(cv.ensure_list, [cv.slugify]), - vol.Optional(ATTR_DEVICENAME): cv.slugify, - vol.Optional(ATTR_INTERVAL): cv.positive_int, - } -) - -PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( - { - vol.Required(CONF_USERNAME): cv.string, - vol.Required(CONF_PASSWORD): cv.string, - vol.Optional(ATTR_ACCOUNTNAME): cv.slugify, - vol.Optional(CONF_MAX_INTERVAL, default=30): cv.positive_int, - vol.Optional(CONF_GPS_ACCURACY_THRESHOLD, default=1000): cv.positive_int, - } -) - - -def setup_scanner(hass, config: dict, see, discovery_info=None): - """Set up the iCloud Scanner.""" - username = config.get(CONF_USERNAME) - password = config.get(CONF_PASSWORD) - account = config.get(CONF_ACCOUNTNAME, slugify(username.partition("@")[0])) - max_interval = config.get(CONF_MAX_INTERVAL) - gps_accuracy_threshold = config.get(CONF_GPS_ACCURACY_THRESHOLD) - - icloudaccount = Icloud( - hass, username, password, account, max_interval, gps_accuracy_threshold, see - ) - - if icloudaccount.api is not None: - ICLOUDTRACKERS[account] = icloudaccount - - else: - _LOGGER.error("No ICLOUDTRACKERS added") - return False - def lost_iphone(call): - """Call the lost iPhone function if the device is found.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - devicename = call.data.get(ATTR_DEVICENAME) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].lost_iphone(devicename) +async def async_setup_scanner( + hass: HomeAssistantType, config, see, discovery_info=None +): + """Old way of setting up the iCloud tracker.""" + pass - hass.services.register( - DOMAIN, "icloud_lost_iphone", lost_iphone, schema=SERVICE_SCHEMA - ) - def update_icloud(call): - """Call the update function of an iCloud account.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - devicename = call.data.get(ATTR_DEVICENAME) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].update_icloud(devicename) +async def async_setup_entry(hass: HomeAssistantType, entry, async_add_entities): + """Configure a dispatcher connection based on a config entry.""" + username = entry.data[CONF_USERNAME] - hass.services.register( - DOMAIN, "icloud_update", update_icloud, schema=SERVICE_SCHEMA - ) + for device in hass.data[DOMAIN][username].devices.values(): + if device.location is None: + _LOGGER.debug("No position found for device %s", device.name) + continue - def reset_account_icloud(call): - """Reset an iCloud account.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].reset_account_icloud() + _LOGGER.debug("Adding device_tracker for %s", device.name) - hass.services.register( - DOMAIN, "icloud_reset_account", reset_account_icloud, schema=SERVICE_SCHEMA - ) + async_add_entities([IcloudTrackerEntity(device)]) - def setinterval(call): - """Call the update function of an iCloud account.""" - accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS) - interval = call.data.get(ATTR_INTERVAL) - devicename = call.data.get(ATTR_DEVICENAME) - for account in accounts: - if account in ICLOUDTRACKERS: - ICLOUDTRACKERS[account].setinterval(interval, devicename) - hass.services.register( - DOMAIN, "icloud_set_interval", setinterval, schema=SERVICE_SCHEMA - ) +class IcloudTrackerEntity(TrackerEntity): + """Represent a tracked device.""" - # Tells the bootstrapper that the component was successfully initialized - return True + def __init__(self, device: IcloudDevice): + """Set up the iCloud tracker entity.""" + self._device = device + self._unsub_dispatcher = None + @property + def unique_id(self): + """Return a unique ID.""" + return f"{self._device.unique_id}_tracker" -class Icloud(DeviceScanner): - """Representation of an iCloud account.""" + @property + def name(self): + """Return the name of the device.""" + return self._device.name - def __init__( - self, hass, username, password, name, max_interval, gps_accuracy_threshold, see - ): - """Initialize an iCloud account.""" - self.hass = hass - self.username = username - self.password = password - self.api = None - self.accountname = name - self.devices = {} - self.seen_devices = {} - self._overridestates = {} - self._intervals = {} - self._max_interval = max_interval - self._gps_accuracy_threshold = gps_accuracy_threshold - self.see = see - - self._trusted_device = None - self._verification_code = None - - self._attrs = {} - self._attrs[ATTR_ACCOUNTNAME] = name - - self.reset_account_icloud() - - randomseconds = random.randint(10, 59) - track_utc_time_change(self.hass, self.keep_alive, second=randomseconds) - - def reset_account_icloud(self): - """Reset an iCloud account.""" - from pyicloud import PyiCloudService - from pyicloud.exceptions import ( - PyiCloudFailedLoginException, - PyiCloudNoDevicesException, - ) - - icloud_dir = self.hass.config.path("icloud") - if not os.path.exists(icloud_dir): - os.makedirs(icloud_dir) - - try: - self.api = PyiCloudService( - self.username, self.password, cookie_directory=icloud_dir, verify=True - ) - except PyiCloudFailedLoginException as error: - self.api = None - _LOGGER.error("Error logging into iCloud Service: %s", error) - return - - try: - self.devices = {} - self._overridestates = {} - self._intervals = {} - for device in self.api.devices: - status = device.status(DEVICESTATUSSET) - _LOGGER.debug("Device Status is %s", status) - devicename = slugify(status["name"].replace(" ", "", 99)) - _LOGGER.info("Adding icloud device: %s", devicename) - if devicename in self.devices: - _LOGGER.error("Multiple devices with name: %s", devicename) - continue - self.devices[devicename] = device - self._intervals[devicename] = 1 - self._overridestates[devicename] = None - except PyiCloudNoDevicesException: - _LOGGER.error("No iCloud Devices found!") - - def icloud_trusted_device_callback(self, callback_data): - """Handle chosen trusted devices.""" - self._trusted_device = int(callback_data.get("trusted_device")) - self._trusted_device = self.api.trusted_devices[self._trusted_device] - - if not self.api.send_verification_code(self._trusted_device): - _LOGGER.error("Failed to send verification code") - self._trusted_device = None - return - - if self.accountname in _CONFIGURING: - request_id = _CONFIGURING.pop(self.accountname) - configurator = self.hass.components.configurator - configurator.request_done(request_id) - - # Trigger the next step immediately - self.icloud_need_verification_code() - - def icloud_need_trusted_device(self): - """We need a trusted device.""" - configurator = self.hass.components.configurator - if self.accountname in _CONFIGURING: - return - - devicesstring = "" - devices = self.api.trusted_devices - for i, device in enumerate(devices): - devicename = device.get( - "deviceName", "SMS to %s" % device.get("phoneNumber") - ) - devicesstring += f"{i}: {devicename};" - - _CONFIGURING[self.accountname] = configurator.request_config( - f"iCloud {self.accountname}", - self.icloud_trusted_device_callback, - description=( - "Please choose your trusted device by entering" - " the index from this list: " + devicesstring - ), - entity_picture="/static/images/config_icloud.png", - submit_caption="Confirm", - fields=[{"id": "trusted_device", "name": "Trusted Device"}], - ) + @property + def location_accuracy(self): + """Return the location accuracy of the device.""" + return self._device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY] - def icloud_verification_callback(self, callback_data): - """Handle the chosen trusted device.""" - from pyicloud.exceptions import PyiCloudException + @property + def latitude(self): + """Return latitude value of the device.""" + return self._device.location[DEVICE_LOCATION_LATITUDE] - self._verification_code = callback_data.get("code") + @property + def longitude(self): + """Return longitude value of the device.""" + return self._device.location[DEVICE_LOCATION_LONGITUDE] - try: - if not self.api.validate_verification_code( - self._trusted_device, self._verification_code - ): - raise PyiCloudException("Unknown failure") - except PyiCloudException as error: - # Reset to the initial 2FA state to allow the user to retry - _LOGGER.error("Failed to verify verification code: %s", error) - self._trusted_device = None - self._verification_code = None - - # Trigger the next step immediately - self.icloud_need_trusted_device() - - if self.accountname in _CONFIGURING: - request_id = _CONFIGURING.pop(self.accountname) - configurator = self.hass.components.configurator - configurator.request_done(request_id) - - def icloud_need_verification_code(self): - """Return the verification code.""" - configurator = self.hass.components.configurator - if self.accountname in _CONFIGURING: - return - - _CONFIGURING[self.accountname] = configurator.request_config( - f"iCloud {self.accountname}", - self.icloud_verification_callback, - description=("Please enter the validation code:"), - entity_picture="/static/images/config_icloud.png", - submit_caption="Confirm", - fields=[{"id": "code", "name": "code"}], - ) - - def keep_alive(self, now): - """Keep the API alive.""" - if self.api is None: - self.reset_account_icloud() - - if self.api is None: - return - - if self.api.requires_2fa: - from pyicloud.exceptions import PyiCloudException - - try: - if self._trusted_device is None: - self.icloud_need_trusted_device() - return - - if self._verification_code is None: - self.icloud_need_verification_code() - return - - self.api.authenticate() - if self.api.requires_2fa: - raise Exception("Unknown failure") - - self._trusted_device = None - self._verification_code = None - except PyiCloudException as error: - _LOGGER.error("Error setting up 2FA: %s", error) - else: - self.api.authenticate() - - currentminutes = dt_util.now().hour * 60 + dt_util.now().minute - try: - for devicename in self.devices: - interval = self._intervals.get(devicename, 1) - if (currentminutes % interval == 0) or ( - interval > 10 and currentminutes % interval in [2, 4] - ): - self.update_device(devicename) - except ValueError: - _LOGGER.debug("iCloud API returned an error") - - def determine_interval(self, devicename, latitude, longitude, battery): - """Calculate new interval.""" - currentzone = run_callback_threadsafe( - self.hass.loop, async_active_zone, self.hass, latitude, longitude - ).result() - - if ( - currentzone is not None - and currentzone == self._overridestates.get(devicename) - ) or (currentzone is None and self._overridestates.get(devicename) == "away"): - return + @property + def should_poll(self): + """No polling needed.""" + return False - zones = ( - self.hass.states.get(entity_id) - for entity_id in sorted(self.hass.states.entity_ids("zone")) + @property + def battery_level(self): + """Return the battery level of the device.""" + return self._device.battery_level + + @property + def source_type(self): + """Return the source type, eg gps or router, of the device.""" + return SOURCE_TYPE_GPS + + @property + def icon(self): + """Return the icon.""" + return icon_for_icloud_device(self._device) + + @property + def device_state_attributes(self): + """Return the device state attributes.""" + return self._device.state_attributes + + @property + def device_info(self): + """Return the device information.""" + return { + "identifiers": {(DOMAIN, self.unique_id)}, + "name": self.name, + "manufacturer": "Apple", + "model": self._device.device_model, + } + + async def async_added_to_hass(self): + """Register state update callback.""" + self._unsub_dispatcher = async_dispatcher_connect( + self.hass, TRACKER_UPDATE, self.async_write_ha_state ) - distances = [] - for zone_state in zones: - zone_state_lat = zone_state.attributes["latitude"] - zone_state_long = zone_state.attributes["longitude"] - zone_distance = distance( - latitude, longitude, zone_state_lat, zone_state_long - ) - distances.append(round(zone_distance / 1000, 1)) - - if distances: - mindistance = min(distances) - else: - mindistance = None + async def async_will_remove_from_hass(self): + """Clean up after entity before removal.""" + self._unsub_dispatcher() - self._overridestates[devicename] = None - if currentzone is not None: - self._intervals[devicename] = self._max_interval - return - - if mindistance is None: - return - - # Calculate out how long it would take for the device to drive to the - # nearest zone at 120 km/h: - interval = round(mindistance / 2, 0) - - # Never poll more than once per minute - interval = max(interval, 1) - - if interval > 180: - # Three hour drive? This is far enough that they might be flying - interval = 30 - - if battery is not None and battery <= 33 and mindistance > 3: - # Low battery - let's check half as often - interval = interval * 2 - - self._intervals[devicename] = interval - - def update_device(self, devicename): - """Update the device_tracker entity.""" - from pyicloud.exceptions import PyiCloudNoDevicesException - - # An entity will not be created by see() when track=false in - # 'known_devices.yaml', but we need to see() it at least once - entity = self.hass.states.get(ENTITY_ID_FORMAT.format(devicename)) - if entity is None and devicename in self.seen_devices: - return - attrs = {} - kwargs = {} - - if self.api is None: - return - - try: - for device in self.api.devices: - if str(device) != str(self.devices[devicename]): - continue - - status = device.status(DEVICESTATUSSET) - _LOGGER.debug("Device Status is %s", status) - dev_id = status["name"].replace(" ", "", 99) - dev_id = slugify(dev_id) - attrs[ATTR_DEVICESTATUS] = DEVICESTATUSCODES.get( - status["deviceStatus"], "error" - ) - attrs[ATTR_LOWPOWERMODE] = status["lowPowerMode"] - attrs[ATTR_BATTERYSTATUS] = status["batteryStatus"] - attrs[ATTR_ACCOUNTNAME] = self.accountname - status = device.status(DEVICESTATUSSET) - battery = status.get("batteryLevel", 0) * 100 - location = status["location"] - if location and location["horizontalAccuracy"]: - horizontal_accuracy = int(location["horizontalAccuracy"]) - if horizontal_accuracy < self._gps_accuracy_threshold: - self.determine_interval( - devicename, - location["latitude"], - location["longitude"], - battery, - ) - interval = self._intervals.get(devicename, 1) - attrs[ATTR_INTERVAL] = interval - accuracy = location["horizontalAccuracy"] - kwargs["dev_id"] = dev_id - kwargs["host_name"] = status["name"] - kwargs["gps"] = (location["latitude"], location["longitude"]) - kwargs["battery"] = battery - kwargs["gps_accuracy"] = accuracy - kwargs[ATTR_ATTRIBUTES] = attrs - self.see(**kwargs) - self.seen_devices[devicename] = True - except PyiCloudNoDevicesException: - _LOGGER.error("No iCloud Devices found") - - def lost_iphone(self, devicename): - """Call the lost iPhone function if the device is found.""" - if self.api is None: - return - - self.api.authenticate() - for device in self.api.devices: - if str(device) == str(self.devices[devicename]): - _LOGGER.info("Playing Lost iPhone sound for %s", devicename) - device.play_sound() - - def update_icloud(self, devicename=None): - """Request device information from iCloud and update device_tracker.""" - from pyicloud.exceptions import PyiCloudNoDevicesException - - if self.api is None: - return - - try: - if devicename is not None: - if devicename in self.devices: - self.update_device(devicename) - else: - _LOGGER.error( - "devicename %s unknown for account %s", - devicename, - self._attrs[ATTR_ACCOUNTNAME], - ) - else: - for device in self.devices: - self.update_device(device) - except PyiCloudNoDevicesException: - _LOGGER.error("No iCloud Devices found") +def icon_for_icloud_device(icloud_device: IcloudDevice) -> str: + """Return a battery icon valid identifier.""" + switcher = { + "iPad": "mdi:tablet-ipad", + "iPhone": "mdi:cellphone-iphone", + "iPod": "mdi:ipod", + "iMac": "mdi:desktop-mac", + "MacBookPro": "mdi:laptop-mac", + } - def setinterval(self, interval=None, devicename=None): - """Set the interval of the given devices.""" - devs = [devicename] if devicename else self.devices - for device in devs: - devid = f"{DOMAIN}.{device}" - devicestate = self.hass.states.get(devid) - if interval is not None: - if devicestate is not None: - self._overridestates[device] = run_callback_threadsafe( - self.hass.loop, - async_active_zone, - self.hass, - float(devicestate.attributes.get("latitude", 0)), - float(devicestate.attributes.get("longitude", 0)), - ).result() - if self._overridestates[device] is None: - self._overridestates[device] = "away" - self._intervals[device] = interval - else: - self._overridestates[device] = None - self.update_device(device) + return switcher.get(icloud_device.device_class, "mdi:cellphone-link") diff --git a/homeassistant/components/icloud/manifest.json b/homeassistant/components/icloud/manifest.json index d3924ee61a8b21..f7295ceae4d0cc 100644 --- a/homeassistant/components/icloud/manifest.json +++ b/homeassistant/components/icloud/manifest.json @@ -1,10 +1,13 @@ { "domain": "icloud", - "name": "Icloud", - "documentation": "https://www.home-assistant.io/integrations/icloud", + "name": "iCloud", + "config_flow": true, + "documentation": "https://www.home-assistant.io/components/icloud", "requirements": [ "pyicloud==0.9.1" ], - "dependencies": ["configurator"], - "codeowners": [] -} + "dependencies": [], + "codeowners": [ + "@Quentame" + ] +} \ No newline at end of file diff --git a/homeassistant/components/icloud/services.yaml b/homeassistant/components/icloud/services.yaml index e69de29bb2d1d6..0cefef6895f4b8 100644 --- a/homeassistant/components/icloud/services.yaml +++ b/homeassistant/components/icloud/services.yaml @@ -0,0 +1,58 @@ +# Describes the format for available iCloud services + +reset: + description: Reset the iCloud account and re build it. + fields: + account: + description: Your iCloud account username (email) or account name. + example: 'steve@apple.com' + +update: + description: Update iCloud devices. + fields: + account: + description: Your iCloud account username (email) or account name. + example: 'steve@apple.com' + +play_sound: + description: Play sound on an Apple device. + fields: + account: + description: (required) Your iCloud account username (email) or account name. + example: 'steve@apple.com' + device_name: + description: (required) The name of the Apple device to play a sound. + example: 'stevesiphone' + +display_message: + description: Display a message on an Apple device. + fields: + account: + description: (required) Your iCloud account username (email) or account name. + example: 'steve@apple.com' + device_name: + description: (required) The name of the Apple device to display the message. + example: 'stevesiphone' + message: + description: (required) The content of your message. + example: 'Hey Steve !' + sound: + description: To make a sound when displaying the message (boolean). + example: 'true' + +lost_device: + description: Make an Apple device in lost state. + fields: + account: + description: (required) Your iCloud account username (email) or account name. + example: 'steve@apple.com' + device_name: + description: (required) The name of the Apple device to set lost. + example: 'stevesiphone' + number: + description: (required) The phone number to call in lost mode (must contain country code). + example: '+33450020100' + message: + description: (required) The message to display in lost mode. + example: 'Call me' + diff --git a/homeassistant/components/icloud/strings.json b/homeassistant/components/icloud/strings.json new file mode 100644 index 00000000000000..343a087738f4da --- /dev/null +++ b/homeassistant/components/icloud/strings.json @@ -0,0 +1,38 @@ +{ + "config": { + "title": "Apple iCloud", + "step": { + "user": { + "title": "iCloud credentials", + "description": "Enter your credentials", + "data": { + "username": "Email", + "password": "Password" + } + }, + "trusted_device": { + "title": "iCloud trusted device", + "description": "Select your trusted device", + "data": { + "trusted_device": "Trusted device" + } + }, + "verification_code": { + "title": "iCloud verification code", + "description": "Please enter the verification code you just received from iCloud", + "data": { + "verification_code": "Verification code" + } + } + }, + "error":{ + "username_exists": "Account already configured", + "login": "Login error: please check your email & password", + "send_verification_code": "Failed to send verification code", + "validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again", + }, + "abort":{ + "username_exists": "Account already configured" + } + } +} diff --git a/homeassistant/generated/config_flows.py b/homeassistant/generated/config_flows.py index 0cec08d94d9805..8f890fafbcd376 100644 --- a/homeassistant/generated/config_flows.py +++ b/homeassistant/generated/config_flows.py @@ -34,6 +34,7 @@ "huawei_lte", "hue", "iaqualink", + "icloud", "ifttt", "ios", "ipma", diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 4a94664ad0ab54..a01ea4091905e4 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -424,6 +424,9 @@ pyheos==0.6.0 # homeassistant.components.homematic pyhomematic==0.1.61 +# homeassistant.components.icloud +pyicloud==0.9.1 + # homeassistant.components.ipma pyipma==1.2.1 diff --git a/tests/components/icloud/__init__.py b/tests/components/icloud/__init__.py new file mode 100644 index 00000000000000..b85f1017e45d6b --- /dev/null +++ b/tests/components/icloud/__init__.py @@ -0,0 +1 @@ +"""Tests for the iCloud component.""" diff --git a/tests/components/icloud/test_config_flow.py b/tests/components/icloud/test_config_flow.py new file mode 100644 index 00000000000000..1e5b7738b12673 --- /dev/null +++ b/tests/components/icloud/test_config_flow.py @@ -0,0 +1,306 @@ +"""Tests for the iCloud config flow.""" +from unittest.mock import patch, MagicMock, PropertyMock +import pytest + + +from pyicloud.base import PyiCloudService +from pyicloud.exceptions import PyiCloudFailedLoginException + +from homeassistant import data_entry_flow +from homeassistant.components.icloud import config_flow + +from homeassistant.components.icloud.config_flow import ( + CONF_TRUSTED_DEVICE, + CONF_VERIFICATION_CODE, +) +from homeassistant.components.icloud.const import ( + DOMAIN, + CONF_ACCOUNT_NAME, + CONF_GPS_ACCURACY_THRESHOLD, + CONF_MAX_INTERVAL, + DEFAULT_GPS_ACCURACY_THRESHOLD, + DEFAULT_MAX_INTERVAL, +) +from homeassistant.const import CONF_PASSWORD, CONF_USERNAME +from homeassistant.helpers.typing import HomeAssistantType + +from tests.common import MockConfigEntry + +USERNAME = "username@me.com" +PASSWORD = "password" +ACCOUNT_NAME = "Account name 1 2 3" +ACCOUNT_NAME_FROM_USERNAME = None +MAX_INTERVAL = 15 +GPS_ACCURACY_THRESHOLD = 250 + +TRUSTED_DEVICES = [ + {"deviceType": "SMS", "areaCode": "", "phoneNumber": "*******58", "deviceId": "1"} +] + + +@pytest.fixture(name="not_create_cookie") +def mock_controller_not_create_cookie(): + """Mock a non cookie creation.""" + with patch("os.path.exists") as exists_mock: + exists_mock.return_value = True + with patch("pyicloud.base.cookielib"): + yield + + +@pytest.fixture(name="session") +def mock_controller_session(not_create_cookie: MagicMock): + """Mock a successful session.""" + with patch("pyicloud.base.PyiCloudSession"): + yield + + +@pytest.fixture(name="service") +def mock_controller_service(session: MagicMock): + """Mock a successful service.""" + with patch.object( + PyiCloudService, "requires_2fa", new_callable=PropertyMock + ) as mock: + mock.return_value = True + yield mock + + +@pytest.fixture(name="service_with_cookie") +def mock_controller_service_with_cookie(session: MagicMock): + """Mock a successful service while already authenticate.""" + with patch.object( + PyiCloudService, "requires_2fa", new_callable=PropertyMock + ) as mock: + mock.return_value = False + yield mock + + +@pytest.fixture(name="service_send_verification_code_failed") +def mock_controller_service_send_verification_code_failed(session: MagicMock): + """Mock a failed service during sending verification code step.""" + with patch( + "pyicloud.base.PyiCloudService", + requires_2fa=False, + trusted_devices=TRUSTED_DEVICES, + ) as service_mock: + service_mock.send_verification_code.return_value = False + yield service_mock + + +@pytest.fixture(name="service_validate_verification_code_failed") +def mock_controller_service_validate_verification_code_failed(session: MagicMock): + """Mock a failed service during validation of verification code step.""" + with patch( + "pyicloud.base.PyiCloudService", + requires_2fa=False, + trusted_devices=TRUSTED_DEVICES, + ) as service_mock: + service_mock.send_verification_code.return_value = True + service_mock.validate_verification_code.return_value = False + yield service_mock + + +def init_config_flow(hass: HomeAssistantType): + """Init a configuration flow.""" + flow = config_flow.IcloudFlowHandler() + flow.hass = hass + return flow + + +async def test_user(hass: HomeAssistantType, service: MagicMock): + """Test user config.""" + flow = init_config_flow(hass) + + result = await flow.async_step_user() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + # test with all provided + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + + +async def test_user_with_cookie( + hass: HomeAssistantType, service_with_cookie: MagicMock +): + """Test user config with presence of a cookie.""" + flow = init_config_flow(hass) + + # test with all provided + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME_FROM_USERNAME + assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD + + +async def test_import(hass: HomeAssistantType, service: MagicMock): + """Test import step.""" + flow = init_config_flow(hass) + + # import with username and password + result = await flow.async_step_import( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "trusted_device" + + # import with all + result = await flow.async_step_import( + { + CONF_USERNAME: USERNAME, + CONF_PASSWORD: PASSWORD, + CONF_ACCOUNT_NAME: ACCOUNT_NAME, + CONF_MAX_INTERVAL: MAX_INTERVAL, + CONF_GPS_ACCURACY_THRESHOLD: GPS_ACCURACY_THRESHOLD, + } + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "trusted_device" + + +async def test_import_with_cookie( + hass: HomeAssistantType, service_with_cookie: MagicMock +): + """Test import step with presence of a cookie.""" + flow = init_config_flow(hass) + + # import with username and password + result = await flow.async_step_import( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME_FROM_USERNAME + assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD + + # import with all + result = await flow.async_step_import( + { + CONF_USERNAME: USERNAME, + CONF_PASSWORD: PASSWORD, + CONF_ACCOUNT_NAME: ACCOUNT_NAME, + CONF_MAX_INTERVAL: MAX_INTERVAL, + CONF_GPS_ACCURACY_THRESHOLD: GPS_ACCURACY_THRESHOLD, + } + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == USERNAME + assert result["data"][CONF_USERNAME] == USERNAME + assert result["data"][CONF_PASSWORD] == PASSWORD + assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME + assert result["data"][CONF_MAX_INTERVAL] == MAX_INTERVAL + assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == GPS_ACCURACY_THRESHOLD + + +async def test_abort_if_already_setup(hass: HomeAssistantType): + """Test we abort if the account is already setup.""" + flow = init_config_flow(hass) + MockConfigEntry( + domain=DOMAIN, data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ).add_to_hass(hass) + + # Should fail, same USERNAME (import) + result = await flow.async_step_import( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "username_exists" + + # Should fail, same ACCOUNT_NAME (import) + result = await flow.async_step_import( + { + CONF_USERNAME: "other_username@icloud.com", + CONF_PASSWORD: PASSWORD, + CONF_ACCOUNT_NAME: ACCOUNT_NAME_FROM_USERNAME, + } + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "username_exists" + + # Should fail, same USERNAME (flow) + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["errors"] == {CONF_USERNAME: "username_exists"} + + +async def test_abort_on_login_failed(hass: HomeAssistantType): + """Test when we have errors during login.""" + flow = init_config_flow(hass) + + with patch( + "pyicloud.base.PyiCloudService.authenticate", + side_effect=PyiCloudFailedLoginException(), + ): + result = await flow.async_step_user( + {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD} + ) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["errors"] == {CONF_USERNAME: "login"} + + +async def test_trusted_device(hass: HomeAssistantType, service: MagicMock): + """Test trusted_device step.""" + flow = init_config_flow(hass) + flow.api = service + + result = await flow.async_step_trusted_device() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + + +async def test_trusted_device_success(hass: HomeAssistantType, service: MagicMock): + """Test trusted_device step success.""" + flow = init_config_flow(hass) + flow.api = service + + result = await flow.async_step_trusted_device({CONF_TRUSTED_DEVICE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_VERIFICATION_CODE + + +async def test_abort_on_send_verification_code_failed( + hass: HomeAssistantType, service_send_verification_code_failed: MagicMock +): + """Test when we have errors during send_verification_code.""" + flow = init_config_flow(hass) + flow.api = service_send_verification_code_failed + + result = await flow.async_step_trusted_device({CONF_TRUSTED_DEVICE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + assert result["errors"] == {CONF_TRUSTED_DEVICE: "send_verification_code"} + + +async def test_verification_code(hass: HomeAssistantType): + """Test verification_code step.""" + flow = init_config_flow(hass) + + result = await flow.async_step_verification_code() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_VERIFICATION_CODE + + +async def test_abort_on_validate_verification_code_failed( + hass: HomeAssistantType, service_validate_verification_code_failed: MagicMock +): + """Test when we have errors during validate_verification_code.""" + flow = init_config_flow(hass) + flow.api = service_validate_verification_code_failed + + result = await flow.async_step_verification_code({CONF_VERIFICATION_CODE: 0}) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == CONF_TRUSTED_DEVICE + assert result["errors"] == {"base": "validate_verification_code"}