diff --git a/homeassistant/components/plugwise/__init__.py b/homeassistant/components/plugwise/__init__.py index 47a9a1e7d9c1d7..999cd25228ed93 100644 --- a/homeassistant/components/plugwise/__init__.py +++ b/homeassistant/components/plugwise/__init__.py @@ -4,7 +4,9 @@ from homeassistant.const import CONF_HOST from homeassistant.core import HomeAssistant +from .const import CONF_USB_PATH from .gateway import async_setup_entry_gw, async_unload_entry_gw +from .usb import async_setup_entry_usb, async_unload_entry_usb async def async_setup(hass: HomeAssistant, config: dict): @@ -16,7 +18,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up Plugwise components from a config entry.""" if entry.data.get(CONF_HOST): return await async_setup_entry_gw(hass, entry) - # PLACEHOLDER USB entry setup + if entry.data.get(CONF_USB_PATH): + return await async_setup_entry_usb(hass, entry) return False @@ -24,5 +27,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): """Unload the Plugwise components.""" if entry.data.get(CONF_HOST): return await async_unload_entry_gw(hass, entry) - # PLACEHOLDER USB entry setup + if entry.data.get(CONF_USB_PATH): + return await async_unload_entry_usb(hass, entry) return False diff --git a/homeassistant/components/plugwise/config_flow.py b/homeassistant/components/plugwise/config_flow.py index e0d2262773704d..d7efb88747c770 100644 --- a/homeassistant/components/plugwise/config_flow.py +++ b/homeassistant/components/plugwise/config_flow.py @@ -1,8 +1,19 @@ """Config flow for Plugwise integration.""" import logging - -from plugwise.exceptions import InvalidAuthentication, PlugwiseException +import os +from typing import Dict + +import plugwise +from plugwise.exceptions import ( + InvalidAuthentication, + NetworkDown, + PlugwiseException, + PortError, + StickInitError, + TimeoutException, +) from plugwise.smile import Smile +import serial.tools.list_ports import voluptuous as vol from homeassistant import config_entries, core, exceptions @@ -20,14 +31,77 @@ from homeassistant.helpers.typing import DiscoveryInfoType from .const import ( # pylint:disable=unused-import + CONF_USB_PATH, DEFAULT_PORT, DEFAULT_SCAN_INTERVAL, DOMAIN, + FLOW_NET, + FLOW_TYPE, + FLOW_USB, + PW_TYPE, + SMILE, + STICK, + STRETCH, ZEROCONF_MAP, ) _LOGGER = logging.getLogger(__name__) +CONF_MANUAL_PATH = "Enter Manually" + +CONNECTION_SCHEMA = vol.Schema( + { + vol.Required(FLOW_TYPE, default=FLOW_NET): vol.In( + {FLOW_NET: f"Network: {SMILE} / {STRETCH}", FLOW_USB: "USB: Stick"} + ), + }, +) + + +@callback +def plugwise_stick_entries(hass): + """Return existing connections for Plugwise USB-stick domain.""" + sticks = [] + for entry in hass.config_entries.async_entries(DOMAIN): + if entry.data.get(PW_TYPE) == STICK: + sticks.append(entry.data.get(CONF_USB_PATH)) + return sticks + + +async def validate_usb_connection(self, device_path=None) -> Dict[str, str]: + """Test if device_path is a real Plugwise USB-Stick.""" + errors = {} + # Avoid creating a 2nd connection to an already configured stick + if device_path in plugwise_stick_entries(self): + errors[CONF_BASE] = "already_configured" + return errors, None + + stick = await self.async_add_executor_job(plugwise.stick, device_path) + try: + await self.async_add_executor_job(stick.connect) + await self.async_add_executor_job(stick.initialize_stick) + await self.async_add_executor_job(stick.disconnect) + except PortError: + errors[CONF_BASE] = "cannot_connect" + except StickInitError: + errors[CONF_BASE] = "stick_init" + except NetworkDown: + errors[CONF_BASE] = "network_down" + except TimeoutException: + errors[CONF_BASE] = "network_timeout" + return errors, stick + + +def get_serial_by_id(dev_path: str) -> str: + """Return a /dev/serial/by-id match for given device if available.""" + by_id = "/dev/serial/by-id" + if not os.path.isdir(by_id): + return dev_path + for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()): + if os.path.realpath(path) == dev_path: + return path + return dev_path + def _base_gw_schema(discovery_info): """Generate base schema for gateways.""" @@ -108,7 +182,7 @@ async def async_step_zeroconf(self, discovery_info: DiscoveryInfoType): CONF_PORT: discovery_info.get(CONF_PORT, DEFAULT_PORT), CONF_NAME: _name, } - return await self.async_step_user() + return await self.async_step_user({FLOW_TYPE: FLOW_NET}) async def async_step_user_gateway(self, user_input=None): """Handle the initial step for gateways.""" @@ -148,13 +222,79 @@ async def async_step_user_gateway(self, user_input=None): errors=errors or {}, ) - # PLACEHOLDER USB async_step_user_usb and async_step_user_usb_manual_paht + async def async_step_user_usb(self, user_input=None): + """Step when user initializes a integration.""" + errors = {} + ports = await self.hass.async_add_executor_job(serial.tools.list_ports.comports) + list_of_ports = [ + f"{p}, s/n: {p.serial_number or 'n/a'}" + + (f" - {p.manufacturer}" if p.manufacturer else "") + for p in ports + ] + list_of_ports.append(CONF_MANUAL_PATH) + + if user_input is not None: + user_selection = user_input[CONF_USB_PATH] + if user_selection == CONF_MANUAL_PATH: + return await self.async_step_manual_path() + port = ports[list_of_ports.index(user_selection)] + device_path = await self.hass.async_add_executor_job( + get_serial_by_id, port.device + ) + errors, stick = await validate_usb_connection(self.hass, device_path) + if not errors: + await self.async_set_unique_id(stick.get_mac_stick()) + return self.async_create_entry( + title="Stick", data={CONF_USB_PATH: device_path, PW_TYPE: STICK} + ) + return self.async_show_form( + step_id="user_usb", + data_schema=vol.Schema( + {vol.Required(CONF_USB_PATH): vol.In(list_of_ports)} + ), + errors=errors, + ) + + async def async_step_manual_path(self, user_input=None): + """Step when manual path to device.""" + errors = {} + if user_input is not None: + device_path = await self.hass.async_add_executor_job( + get_serial_by_id, user_input.get(CONF_USB_PATH) + ) + errors, stick = await validate_usb_connection(self.hass, device_path) + if not errors: + await self.async_set_unique_id(stick.get_mac_stick()) + return self.async_create_entry( + title="Stick", data={CONF_USB_PATH: device_path} + ) + return self.async_show_form( + step_id="manual_path", + data_schema=vol.Schema( + { + vol.Required( + CONF_USB_PATH, default="/dev/ttyUSB0" or vol.UNDEFINED + ): str + } + ), + errors=errors, + ) async def async_step_user(self, user_input=None): - """Handle the initial step.""" + """Handle the initial step when using network/gateway setups.""" + errors = {} + if user_input is not None: + if user_input[FLOW_TYPE] == FLOW_NET: + return await self.async_step_user_gateway() - # PLACEHOLDER USB vs Gateway Logic - return await self.async_step_user_gateway() + if user_input[FLOW_TYPE] == FLOW_USB: + return await self.async_step_user_usb() + + return self.async_show_form( + step_id="user", + data_schema=CONNECTION_SCHEMA, + errors=errors, + ) @staticmethod @callback diff --git a/homeassistant/components/plugwise/const.py b/homeassistant/components/plugwise/const.py index c6ef43af602667..77381ea1101796 100644 --- a/homeassistant/components/plugwise/const.py +++ b/homeassistant/components/plugwise/const.py @@ -1,10 +1,36 @@ """Constant for Plugwise component.""" + +from homeassistant.components.switch import DEVICE_CLASS_OUTLET +from homeassistant.const import ( + ATTR_DEVICE_CLASS, + ATTR_ICON, + ATTR_NAME, + ATTR_STATE, + ATTR_UNIT_OF_MEASUREMENT, + DEVICE_CLASS_POWER, + ENERGY_KILO_WATT_HOUR, + POWER_WATT, +) + +ATTR_ENABLED_DEFAULT = "enabled_default" DOMAIN = "plugwise" +GATEWAY = "gateway" +PW_TYPE = "plugwise_type" +SMILE = "smile" +STICK = "stick" +STRETCH = "stretch" +USB = "usb" -SENSOR_PLATFORMS = ["sensor", "switch"] PLATFORMS_GATEWAY = ["binary_sensor", "climate", "sensor", "switch"] +PLATFORMS_USB = ["switch"] +SENSOR_PLATFORMS = ["sensor", "switch"] PW_TYPE = "plugwise_type" -GATEWAY = "gateway" + +FLOW_NET = "flow_network" +FLOW_TYPE = "flow_type" +FLOW_USB = "flow_usb" +FLOW_SMILE = "smile (Adam/Anna/P1)" +FLOW_STRETCH = "stretch (Stretch)" # Sensor mapping SENSOR_MAP_DEVICE_CLASS = 2 @@ -28,6 +54,7 @@ CONF_MIN_TEMP = "min_temp" CONF_POWER = "power" CONF_THERMOSTAT = "thermostat" +CONF_USB_PATH = "usb_path" ATTR_ILLUMINANCE = "illuminance" @@ -55,3 +82,54 @@ "smile_open_therm": "Adam", "stretch": "Stretch", } + +# Callback types +CB_NEW_NODE = "NEW_NODE" + +# Sensor IDs +AVAILABLE_SENSOR_ID = "available" +CURRENT_POWER_SENSOR_ID = "power_1s" +TODAY_ENERGY_SENSOR_ID = "power_con_today" + +ATTR_MAC_ADDRESS = "mac" + +# Sensor types +USB_SENSORS = { + AVAILABLE_SENSOR_ID: { + ATTR_DEVICE_CLASS: None, + ATTR_ENABLED_DEFAULT: False, + ATTR_ICON: "mdi:signal-off", + ATTR_NAME: "Available", + ATTR_STATE: "get_available", + ATTR_UNIT_OF_MEASUREMENT: None, + }, + CURRENT_POWER_SENSOR_ID: { + ATTR_DEVICE_CLASS: DEVICE_CLASS_POWER, + ATTR_ENABLED_DEFAULT: True, + ATTR_ICON: None, + ATTR_NAME: "Power usage", + ATTR_STATE: "get_power_usage", + ATTR_UNIT_OF_MEASUREMENT: POWER_WATT, + }, + TODAY_ENERGY_SENSOR_ID: { + ATTR_DEVICE_CLASS: DEVICE_CLASS_POWER, + ATTR_ENABLED_DEFAULT: True, + ATTR_ICON: None, + ATTR_NAME: "Power consumption today", + ATTR_STATE: "get_power_consumption_today", + ATTR_UNIT_OF_MEASUREMENT: ENERGY_KILO_WATT_HOUR, + }, +} + +# Switch types +USB_SWITCHES = { + "relay": { + ATTR_DEVICE_CLASS: DEVICE_CLASS_OUTLET, + ATTR_ENABLED_DEFAULT: True, + ATTR_ICON: None, + ATTR_NAME: "Relay state", + ATTR_STATE: "get_relay_state", + "switch": "set_relay_state", + ATTR_UNIT_OF_MEASUREMENT: "state", + } +} diff --git a/homeassistant/components/plugwise/strings.json b/homeassistant/components/plugwise/strings.json index 2ed6721bab36dc..2ecd6cbba7a693 100644 --- a/homeassistant/components/plugwise/strings.json +++ b/homeassistant/components/plugwise/strings.json @@ -1,6 +1,10 @@ { "options": { "step": { + "none": { + "title": "No Options available", + "description": "This Integration does not provide any Options" + }, "init": { "description": "Adjust Plugwise Options", "data": { @@ -10,6 +14,7 @@ } }, "config": { + "flow_title": "Smile: {name}", "step": { "user": { "title": "Plugwise type", @@ -20,23 +25,38 @@ }, "user_gateway": { "title": "Connect to the Smile", - "description": "Please enter", + "description": "Please enter:", "data": { "password": "Smile ID", + "username" : "Smile Username", "host": "[%key:common::config_flow::data::ip%]", - "port": "[%key:common::config_flow::data::port%]", - "username" : "Smile Username" + "port": "[%key:common::config_flow::data::port%]" + } + }, + "user_usb": { + "title": "Setup direct connection to legacy Plugwise USB-stick", + "description": "Please enter:", + "data": { + "usb_path": "[%key:common::config_flow::data::usb_path%]" + } + }, + "manual_path": { + "data": { + "usb_path": "[%key:common::config_flow::data::usb_path%]" } } }, "error": { + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]", + "invalid_auth": "Invalid authentication, check the 8 characters of your Smile ID", "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", - "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", + "network_down": "Plugwise Zigbee network is down", + "network_timeout": "Network communication timeout", + "stick_init": "Initialization of Plugwise USB-stick failed", "unknown": "[%key:common::config_flow::error::unknown%]" }, "abort": { "already_configured": "[%key:common::config_flow::abort::already_configured_service%]" - }, - "flow_title": "Smile: {name}" + } } } diff --git a/homeassistant/components/plugwise/switch.py b/homeassistant/components/plugwise/switch.py index ce3be04681ac74..0df05c685d37ee 100644 --- a/homeassistant/components/plugwise/switch.py +++ b/homeassistant/components/plugwise/switch.py @@ -5,20 +5,60 @@ from plugwise.exceptions import PlugwiseException from homeassistant.components.switch import SwitchEntity +from homeassistant.const import ATTR_DEVICE_CLASS, ATTR_ICON, ATTR_STATE from homeassistant.core import callback -from .const import COORDINATOR, DOMAIN, SWITCH_ICON +from .const import ( + ATTR_ENABLED_DEFAULT, + AVAILABLE_SENSOR_ID, + CB_NEW_NODE, + COORDINATOR, + CURRENT_POWER_SENSOR_ID, + DOMAIN, + PW_TYPE, + STICK, + SWITCH_ICON, + TODAY_ENERGY_SENSOR_ID, + USB, + USB_SENSORS, + USB_SWITCHES, +) from .gateway import SmileGateway +from .usb import NodeEntity _LOGGER = logging.getLogger(__name__) async def async_setup_entry(hass, config_entry, async_add_entities): """Set up the Smile switches from a config entry.""" - # PLACEHOLDER USB entry setup + if hass.data[DOMAIN][config_entry.entry_id][PW_TYPE] == USB: + return await async_setup_entry_usb(hass, config_entry, async_add_entities) + # Considered default and for earlier setups without usb/network config_flow return await async_setup_entry_gateway(hass, config_entry, async_add_entities) +async def async_setup_entry_usb(hass, config_entry, async_add_entities): + """Set up the USB switches from a config entry.""" + stick = hass.data[DOMAIN][config_entry.entry_id][STICK] + + async def async_add_switch(mac): + """Add plugwise switch.""" + node = stick.node(mac) + for switch_type in node.get_switches(): + if switch_type in USB_SWITCHES: + async_add_entities([USBSwitch(node, mac, switch_type)]) + + for mac in hass.data[DOMAIN][config_entry.entry_id]["switch"]: + hass.async_create_task(async_add_switch(mac)) + + def discoved_switch(mac): + """Add newly discovered switch.""" + hass.async_create_task(async_add_switch(mac)) + + # Listen for discovered nodes + stick.subscribe_stick_callback(discoved_switch, CB_NEW_NODE) + + async def async_setup_entry_gateway(hass, config_entry, async_add_entities): """Set up the Smile switches from a config entry.""" api = hass.data[DOMAIN][config_entry.entry_id]["api"] @@ -114,3 +154,79 @@ def _async_process_data(self): self._is_on = data["relay"] self.async_write_ha_state() + + +class USBSwitch(NodeEntity, SwitchEntity): + """Representation of a switch.""" + + def __init__(self, node, mac, switch_id): + """Initialize a Node entity.""" + super().__init__(node, mac) + self.switch_id = switch_id + self.switch_type = USB_SWITCHES[self.switch_id] + if (CURRENT_POWER_SENSOR_ID in node.get_sensors()) and ( + TODAY_ENERGY_SENSOR_ID in node.get_sensors() + ): + self.node_callbacks = ( + AVAILABLE_SENSOR_ID, + switch_id, + CURRENT_POWER_SENSOR_ID, + TODAY_ENERGY_SENSOR_ID, + ) + else: + self.node_callbacks = (AVAILABLE_SENSOR_ID, self.switch_id) + + @property + def current_power_w(self): + """Return the current power usage in W.""" + current_power = getattr( + self._node, USB_SENSORS[CURRENT_POWER_SENSOR_ID][ATTR_STATE] + )() + if current_power: + return float(round(current_power, 2)) + return None + + @property + def device_class(self): + """Return the device class of this switch.""" + return self.switch_type[ATTR_DEVICE_CLASS] + + @property + def entity_registry_enabled_default(self): + """Return the switch registration state.""" + return self.switch_type[ATTR_ENABLED_DEFAULT] + + @property + def icon(self): + """Return the icon.""" + return ( + None if self.switch_type[ATTR_DEVICE_CLASS] else self.switch_type[ATTR_ICON] + ) + + @property + def is_on(self): + """Return true if the switch is on.""" + return getattr(self._node, self.switch_type[ATTR_STATE])() + + @property + def today_energy_kwh(self): + """Return the today total energy usage in kWh.""" + today_energy = getattr( + self._node, USB_SENSORS[TODAY_ENERGY_SENSOR_ID][ATTR_STATE] + )() + if today_energy: + return float(round(today_energy, 3)) + return None + + def turn_off(self, **kwargs): + """Instruct the switch to turn off.""" + getattr(self._node, self.switch_type["switch"])(False) + + def turn_on(self, **kwargs): + """Instruct the switch to turn on.""" + getattr(self._node, self.switch_type["switch"])(True) + + @property + def unique_id(self): + """Get unique ID.""" + return f"{self._mac}-{self.switch_id}" diff --git a/homeassistant/components/plugwise/usb.py b/homeassistant/components/plugwise/usb.py new file mode 100644 index 00000000000000..5058152faa959e --- /dev/null +++ b/homeassistant/components/plugwise/usb.py @@ -0,0 +1,179 @@ +"""Support for Plugwise devices connected to a Plugwise USB-stick.""" +import asyncio +import logging + +import plugwise +from plugwise.exceptions import ( + CirclePlusError, + NetworkDown, + PortError, + StickInitError, + TimeoutException, +) + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ATTR_STATE, EVENT_HOMEASSISTANT_STOP +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ConfigEntryNotReady +from homeassistant.helpers.entity import Entity + +from .const import ( + AVAILABLE_SENSOR_ID, + CONF_USB_PATH, + DOMAIN, + PLATFORMS_USB, + PW_TYPE, + STICK, + UNDO_UPDATE_LISTENER, + USB, + USB_SENSORS, +) + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry_usb(hass: HomeAssistant, config_entry: ConfigEntry): + """Establish connection with plugwise USB-stick.""" + hass.data.setdefault(DOMAIN, {}) + + def discover_finished(): + """Create entities for all discovered nodes.""" + nodes = stick.nodes() + _LOGGER.debug( + "Successfully discovered %s out of %s registered nodes", + str(len(nodes)), + str(stick.registered_nodes()), + ) + for component in PLATFORMS_USB: + hass.data[DOMAIN][config_entry.entry_id][component] = [] + for mac in nodes: + if component in stick.node(mac).get_categories(): + hass.data[DOMAIN][config_entry.entry_id][component].append(mac) + hass.async_create_task( + hass.config_entries.async_forward_entry_setup(config_entry, component) + ) + stick.auto_update() + # Enable reception of join request and automatically accept new node join requests + stick.allow_join_requests(True, True) + + def shutdown(event): + hass.async_add_executor_job(stick.disconnect) + + stick = plugwise.stick(config_entry.data[CONF_USB_PATH]) + hass.data[DOMAIN][config_entry.entry_id] = {PW_TYPE: USB, STICK: stick} + try: + _LOGGER.debug("Connect to USB-Stick") + await hass.async_add_executor_job(stick.connect) + _LOGGER.debug("Initialize USB-stick") + await hass.async_add_executor_job(stick.initialize_stick) + _LOGGER.debug("Discover Circle+ node") + await hass.async_add_executor_job(stick.initialize_circle_plus) + except PortError as err: + _LOGGER.error("Connecting to Plugwise USBstick communication failed") + raise ConfigEntryNotReady from err + except StickInitError as err: + _LOGGER.error("Initializing of Plugwise USBstick communication failed") + await hass.async_add_executor_job(stick.disconnect) + raise ConfigEntryNotReady from err + except NetworkDown as err: + _LOGGER.warning("Plugwise zigbee network down") + await hass.async_add_executor_job(stick.disconnect) + raise ConfigEntryNotReady from err + except CirclePlusError as err: + _LOGGER.warning("Failed to connect to Circle+ node") + await hass.async_add_executor_job(stick.disconnect) + raise ConfigEntryNotReady from err + except TimeoutException as err: + _LOGGER.warning("Timeout") + await hass.async_add_executor_job(stick.disconnect) + raise ConfigEntryNotReady from err + _LOGGER.debug("Start discovery of registered nodes") + stick.scan(discover_finished) + + # Listen when EVENT_HOMEASSISTANT_STOP is fired + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + + # Listen for entry updates + hass.data[DOMAIN][config_entry.entry_id][ + UNDO_UPDATE_LISTENER + ] = config_entry.add_update_listener(_async_update_listener) + + return True + + +async def async_unload_entry_usb(hass: HomeAssistant, config_entry: ConfigEntry): + """Unload the Plugwise stick connection.""" + unload_ok = all( + await asyncio.gather( + *[ + hass.config_entries.async_forward_entry_unload(config_entry, component) + for component in PLATFORMS_USB + ] + ) + ) + hass.data[DOMAIN][config_entry.entry_id][UNDO_UPDATE_LISTENER]() + if unload_ok: + stick = hass.data[DOMAIN][config_entry.entry_id]["stick"] + await hass.async_add_executor_job(stick.disconnect) + hass.data[DOMAIN].pop(config_entry.entry_id) + return unload_ok + + +async def _async_update_listener(hass: HomeAssistant, config_entry: ConfigEntry): + """Handle options update.""" + await hass.config_entries.async_reload(config_entry.entry_id) + + +class NodeEntity(Entity): + """Base class for a Plugwise entities.""" + + def __init__(self, node, mac): + """Initialize a Node entity.""" + self._node = node + self._mac = mac + self.node_callbacks = (AVAILABLE_SENSOR_ID,) + + async def async_added_to_hass(self): + """Subscribe to updates.""" + for node_callback in self.node_callbacks: + self._node.subscribe_callback(self.sensor_update, node_callback) + + async def async_will_remove_from_hass(self): + """Unsubscribe to updates.""" + for node_callback in self.node_callbacks: + self._node.unsubscribe_callback(self.sensor_update, node_callback) + + @property + def available(self): + """Return the availability of this entity.""" + return getattr(self._node, USB_SENSORS[AVAILABLE_SENSOR_ID][ATTR_STATE])() + + @property + def device_info(self): + """Return the device info.""" + return { + "identifiers": {(DOMAIN, self._mac)}, + "name": f"{self._node.get_node_type()} ({self._mac})", + "manufacturer": "Plugwise", + "model": self._node.get_node_type(), + "sw_version": f"{self._node.get_firmware_version()}", + } + + @property + def name(self): + """Return the display name of this entity.""" + return f"{self._node.get_node_type()} {self._mac[-5:]}" + + def sensor_update(self, state): + """Handle status update of Entity.""" + self.schedule_update_ha_state() + + @property + def should_poll(self): + """Disable polling.""" + return False + + @property + def unique_id(self): + """Get unique ID.""" + return f"{self._mac}-{self._node.get_node_type()}" diff --git a/tests/components/plugwise/test_config_flow.py b/tests/components/plugwise/test_config_flow.py index 382e7bc1a52a95..683d3aeb381762 100644 --- a/tests/components/plugwise/test_config_flow.py +++ b/tests/components/plugwise/test_config_flow.py @@ -1,18 +1,34 @@ """Test the Plugwise config flow.""" +import os from unittest.mock import MagicMock, patch from plugwise.exceptions import ( ConnectionFailedError, InvalidAuthentication, + NetworkDown, PlugwiseException, + StickInitError, + TimeoutException, ) import pytest +import serial.tools.list_ports +from voluptuous.error import MultipleInvalid -from homeassistant import config_entries, data_entry_flow, setup +from homeassistant import setup +from homeassistant.components.plugwise.config_flow import ( + CONF_MANUAL_PATH, + get_serial_by_id, +) from homeassistant.components.plugwise.const import ( + CONF_USB_PATH, DEFAULT_PORT, DEFAULT_SCAN_INTERVAL, DOMAIN, + FLOW_NET, + FLOW_TYPE, + FLOW_USB, + PW_TYPE, + STICK, ) from homeassistant.config_entries import SOURCE_USER, SOURCE_ZEROCONF from homeassistant.const import ( @@ -21,9 +37,12 @@ CONF_PASSWORD, CONF_PORT, CONF_SCAN_INTERVAL, + CONF_SOURCE, CONF_USERNAME, ) +from homeassistant.data_entry_flow import RESULT_TYPE_CREATE_ENTRY, RESULT_TYPE_FORM +from tests.async_mock import AsyncMock, sentinel from tests.common import MockConfigEntry TEST_HOST = "1.1.1.1" @@ -32,6 +51,8 @@ TEST_PORT = 81 TEST_USERNAME = "smile" TEST_USERNAME2 = "stretch" +TEST_USBPORT = "/dev/ttyUSB1" +TEST_USBPORT2 = "/dev/ttyUSB2" TEST_DISCOVERY = { "host": TEST_HOST, @@ -46,26 +67,79 @@ } +class FakeStick: + """Mock class to emulate mac address fetching.""" + + def get_mac_stick(mac="01:23:45:67:AB"): + """Mock function returning a MAC address.""" + return mac + + @pytest.fixture(name="mock_smile") def mock_smile(): """Create a Mock Smile for testing exceptions.""" with patch( "homeassistant.components.plugwise.config_flow.Smile", ) as smile_mock: - smile_mock.PlugwiseError = PlugwiseException + smile_mock.PlugwiseException = PlugwiseException smile_mock.InvalidAuthentication = InvalidAuthentication smile_mock.ConnectionFailedError = ConnectionFailedError smile_mock.return_value.connect.return_value = True yield smile_mock.return_value +def com_port(): + """Mock of a serial port.""" + port = serial.tools.list_ports_common.ListPortInfo(TEST_USBPORT) + port.serial_number = "1234" + port.manufacturer = "Virtual serial port" + port.description = "Some serial port" + return port + + +async def test_form_flow_gateway(hass): + """Test we get the form for Plugwise Gateway product type.""" + await setup.async_setup_component(hass, "persistent_notification", {}) + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER} + ) + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {} + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={FLOW_TYPE: FLOW_NET} + ) + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {} + assert result["step_id"] == "user_gateway" + + +async def test_form_flow_usb(hass): + """Test we get the form for Plugwise USB product type.""" + await setup.async_setup_component(hass, "persistent_notification", {}) + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER} + ) + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {} + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={FLOW_TYPE: FLOW_USB} + ) + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {} + assert result["step_id"] == "user_usb" + + async def test_form(hass): """Test we get the form.""" await setup.async_setup_component(hass, "persistent_notification", {}) result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": SOURCE_USER} + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_NET} ) - assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["type"] == RESULT_TYPE_FORM assert result["errors"] == {} with patch( @@ -80,12 +154,12 @@ async def test_form(hass): ) as mock_setup_entry: result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, + user_input={CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, ) await hass.async_block_till_done() - assert result2["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result2["type"] == RESULT_TYPE_CREATE_ENTRY assert result2["data"] == { CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD, @@ -102,10 +176,10 @@ async def test_zeroconf_form(hass): await setup.async_setup_component(hass, "persistent_notification", {}) result = await hass.config_entries.flow.async_init( DOMAIN, - context={"source": SOURCE_ZEROCONF}, + context={CONF_SOURCE: SOURCE_ZEROCONF}, data=TEST_DISCOVERY, ) - assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["type"] == RESULT_TYPE_FORM assert result["errors"] == {} with patch( @@ -120,12 +194,12 @@ async def test_zeroconf_form(hass): ) as mock_setup_entry: result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_PASSWORD: TEST_PASSWORD}, + user_input={CONF_PASSWORD: TEST_PASSWORD}, ) await hass.async_block_till_done() - assert result2["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result2["type"] == RESULT_TYPE_CREATE_ENTRY assert result2["data"] == { CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD, @@ -140,9 +214,9 @@ async def test_zeroconf_form(hass): async def test_form_username(hass): """Test we get the username data back.""" result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": SOURCE_USER} + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_NET} ) - assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["type"] == RESULT_TYPE_FORM assert result["errors"] == {} with patch( @@ -166,7 +240,7 @@ async def test_form_username(hass): await hass.async_block_till_done() - assert result2["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result2["type"] == RESULT_TYPE_CREATE_ENTRY assert result2["data"] == { CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD, @@ -179,10 +253,10 @@ async def test_form_username(hass): result3 = await hass.config_entries.flow.async_init( DOMAIN, - context={"source": SOURCE_ZEROCONF}, + context={CONF_SOURCE: SOURCE_ZEROCONF}, data=TEST_DISCOVERY, ) - assert result3["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result3["type"] == RESULT_TYPE_FORM assert result3["errors"] == {} with patch( @@ -197,7 +271,7 @@ async def test_form_username(hass): ) as mock_setup_entry: result4 = await hass.config_entries.flow.async_configure( result3["flow_id"], - {CONF_PASSWORD: TEST_PASSWORD}, + user_input={CONF_PASSWORD: TEST_PASSWORD}, ) await hass.async_block_till_done() @@ -209,7 +283,7 @@ async def test_form_username(hass): async def test_form_invalid_auth(hass, mock_smile): """Test we handle invalid auth.""" result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": config_entries.SOURCE_USER} + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_NET} ) mock_smile.connect.side_effect = InvalidAuthentication @@ -217,17 +291,17 @@ async def test_form_invalid_auth(hass, mock_smile): result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, + user_input={CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, ) - assert result2["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result2["type"] == RESULT_TYPE_FORM assert result2["errors"] == {"base": "invalid_auth"} async def test_form_cannot_connect(hass, mock_smile): """Test we handle cannot connect error.""" result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": config_entries.SOURCE_USER} + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_NET} ) mock_smile.connect.side_effect = ConnectionFailedError @@ -235,17 +309,17 @@ async def test_form_cannot_connect(hass, mock_smile): result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, + user_input={CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, ) - assert result2["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result2["type"] == RESULT_TYPE_FORM assert result2["errors"] == {"base": "cannot_connect"} async def test_form_cannot_connect_port(hass, mock_smile): """Test we handle cannot connect to port error.""" result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": config_entries.SOURCE_USER} + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_NET} ) mock_smile.connect.side_effect = ConnectionFailedError @@ -253,17 +327,21 @@ async def test_form_cannot_connect_port(hass, mock_smile): result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD, CONF_PORT: TEST_PORT}, + user_input={ + CONF_HOST: TEST_HOST, + CONF_PASSWORD: TEST_PASSWORD, + CONF_PORT: TEST_PORT, + }, ) - assert result2["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result2["type"] == RESULT_TYPE_FORM assert result2["errors"] == {"base": "cannot_connect"} async def test_form_other_problem(hass, mock_smile): """Test we handle cannot connect error.""" result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": config_entries.SOURCE_USER} + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_NET} ) mock_smile.connect.side_effect = TimeoutError @@ -271,10 +349,10 @@ async def test_form_other_problem(hass, mock_smile): result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, + user_input={CONF_HOST: TEST_HOST, CONF_PASSWORD: TEST_PASSWORD}, ) - assert result2["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result2["type"] == RESULT_TYPE_FORM assert result2["errors"] == {"base": "unknown"} @@ -298,13 +376,13 @@ async def test_options_flow_power(hass, mock_smile) -> None: result = await hass.config_entries.options.async_init(entry.entry_id) - assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["type"] == RESULT_TYPE_FORM assert result["step_id"] == "init" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={CONF_SCAN_INTERVAL: 10} ) - assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["type"] == RESULT_TYPE_CREATE_ENTRY assert result["data"] == { CONF_SCAN_INTERVAL: 10, } @@ -330,14 +408,230 @@ async def test_options_flow_thermo(hass, mock_smile) -> None: result = await hass.config_entries.options.async_init(entry.entry_id) - assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["type"] == RESULT_TYPE_FORM assert result["step_id"] == "init" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={CONF_SCAN_INTERVAL: 60} ) - assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["type"] == RESULT_TYPE_CREATE_ENTRY assert result["data"] == { CONF_SCAN_INTERVAL: 60, } + + +@patch("serial.tools.list_ports.comports", MagicMock(return_value=[com_port()])) +@patch("plugwise.stick.connect", MagicMock(return_value=None)) +@patch("plugwise.stick.initialize_stick", MagicMock(return_value=None)) +@patch("plugwise.stick.disconnect", MagicMock(return_value=None)) +async def test_user_flow_select(hass): + """Test user flow when USB-stick is selected from list.""" + port = com_port() + port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}" + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_USB} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_USB_PATH: port_select} + ) + assert result["type"] == RESULT_TYPE_CREATE_ENTRY + assert result["data"] == {PW_TYPE: STICK, CONF_USB_PATH: TEST_USBPORT} + + # Retry to ensure configuring the same port is not allowed + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_USB} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_USB_PATH: port_select} + ) + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {"base": "already_configured"} + + +async def test_user_flow_manual_selected_show_form(hass): + """Test user step form when manual path is selected.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_USB} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + assert result["type"] == RESULT_TYPE_FORM + assert result["step_id"] == "manual_path" + + +@patch( + "homeassistant.components.plugwise.config_flow.validate_usb_connection", + AsyncMock(return_value=[None, FakeStick]), +) +async def test_user_flow_manual(hass): + """Test user flow when USB-stick is manually entered.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_USB} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: TEST_USBPORT2}, + ) + assert result["type"] == RESULT_TYPE_CREATE_ENTRY + assert result["data"] == {CONF_USB_PATH: TEST_USBPORT2} + + +async def test_invalid_connection(hass): + """Test invalid connection.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_USB} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {CONF_USB_PATH: "/dev/null"}, + ) + + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {"base": "cannot_connect"} + + +async def test_empty_connection(hass): + """Test empty connection.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={CONF_SOURCE: SOURCE_USER}, data={FLOW_TYPE: FLOW_USB} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + + try: + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {CONF_USB_PATH: None}, + ) + assert False + except MultipleInvalid: + assert True + + assert result["type"] == RESULT_TYPE_FORM + assert result["errors"] == {} + + +@patch("plugwise.stick.connect", MagicMock(return_value=None)) +@patch("plugwise.stick.initialize_stick", MagicMock(side_effect=(StickInitError))) +async def test_failed_initialization(hass): + """Test we handle failed initialization of Plugwise USB-stick.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={CONF_SOURCE: SOURCE_USER}, + data={FLOW_TYPE: FLOW_USB}, + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: "/dev/null"}, + ) + assert result["type"] == "form" + assert result["errors"] == {"base": "stick_init"} + + +@patch("plugwise.stick.connect", MagicMock(return_value=None)) +@patch("plugwise.stick.initialize_stick", MagicMock(side_effect=(NetworkDown))) +async def test_network_down_exception(hass): + """Test we handle network_down exception.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={CONF_SOURCE: SOURCE_USER}, + data={FLOW_TYPE: FLOW_USB}, + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: "/dev/null"}, + ) + assert result["type"] == "form" + assert result["errors"] == {"base": "network_down"} + + +@patch("plugwise.stick.connect", MagicMock(return_value=None)) +@patch("plugwise.stick.initialize_stick", MagicMock(side_effect=(TimeoutException))) +async def test_timeout_exception(hass): + """Test we handle time exception.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={CONF_SOURCE: SOURCE_USER}, + data={FLOW_TYPE: FLOW_USB}, + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: CONF_MANUAL_PATH}, + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_USB_PATH: "/dev/null"}, + ) + assert result["type"] == "form" + assert result["errors"] == {"base": "network_timeout"} + + +def test_get_serial_by_id_no_dir(): + """Test serial by id conversion if there's no /dev/serial/by-id.""" + p1 = patch("os.path.isdir", MagicMock(return_value=False)) + p2 = patch("os.scandir") + with p1 as is_dir_mock, p2 as scan_mock: + res = get_serial_by_id(sentinel.path) + assert res is sentinel.path + assert is_dir_mock.call_count == 1 + assert scan_mock.call_count == 0 + + +def test_get_serial_by_id(): + """Test serial by id conversion.""" + p1 = patch("os.path.isdir", MagicMock(return_value=True)) + p2 = patch("os.scandir") + + def _realpath(path): + if path is sentinel.matched_link: + return sentinel.path + return sentinel.serial_link_path + + p3 = patch("os.path.realpath", side_effect=_realpath) + with p1 as is_dir_mock, p2 as scan_mock, p3: + res = get_serial_by_id(sentinel.path) + assert res is sentinel.path + assert is_dir_mock.call_count == 1 + assert scan_mock.call_count == 1 + + entry1 = MagicMock(spec_set=os.DirEntry) + entry1.is_symlink.return_value = True + entry1.path = sentinel.some_path + + entry2 = MagicMock(spec_set=os.DirEntry) + entry2.is_symlink.return_value = False + entry2.path = sentinel.other_path + + entry3 = MagicMock(spec_set=os.DirEntry) + entry3.is_symlink.return_value = True + entry3.path = sentinel.matched_link + + scan_mock.return_value = [entry1, entry2, entry3] + res = get_serial_by_id(sentinel.path) + assert res is sentinel.matched_link + assert is_dir_mock.call_count == 2 + assert scan_mock.call_count == 2