Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 44 additions & 75 deletions homeassistant/components/modbus/switch.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Support for Modbus switches."""
from abc import ABC
import logging
from typing import Optional
from typing import Any, Dict, Optional

from pymodbus.exceptions import ConnectionException, ModbusException
from pymodbus.pdu import ExceptionResponse
import voluptuous as vol

from homeassistant.components.switch import PLATFORM_SCHEMA
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity
from homeassistant.const import (
CONF_COMMAND_OFF,
CONF_COMMAND_ON,
Expand All @@ -17,7 +18,9 @@
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, HomeAssistantType

from . import ModbusHub
from .const import (
CALL_TYPE_COIL,
CALL_TYPE_REGISTER_HOLDING,
Expand Down Expand Up @@ -76,51 +79,31 @@
)


def setup_platform(hass, config, add_entities, discovery_info=None):
"""Read configuration and create Modbus devices."""
async def async_setup_platform(
hass: HomeAssistantType, config: ConfigType, async_add_entities, discovery_info=None
):
"""Read configuration and create Modbus switches."""
switches = []
if CONF_COILS in config:
for coil in config[CONF_COILS]:
hub_name = coil[CONF_HUB]
hub = hass.data[MODBUS_DOMAIN][hub_name]
switches.append(
ModbusCoilSwitch(
hub, coil[CONF_NAME], coil[CONF_SLAVE], coil[CALL_TYPE_COIL]
)
)
hub: ModbusHub = hass.data[MODBUS_DOMAIN][coil[CONF_HUB]]
switches.append(ModbusCoilSwitch(hub, coil))
if CONF_REGISTERS in config:
for register in config[CONF_REGISTERS]:
hub_name = register[CONF_HUB]
hub = hass.data[MODBUS_DOMAIN][hub_name]

switches.append(
ModbusRegisterSwitch(
hub,
register[CONF_NAME],
register.get(CONF_SLAVE),
register[CONF_REGISTER],
register[CONF_COMMAND_ON],
register[CONF_COMMAND_OFF],
register[CONF_VERIFY_STATE],
register.get(CONF_VERIFY_REGISTER),
register[CONF_REGISTER_TYPE],
register.get(CONF_STATE_ON),
register.get(CONF_STATE_OFF),
)
)
hub: ModbusHub = hass.data[MODBUS_DOMAIN][register[CONF_HUB]]
switches.append(ModbusRegisterSwitch(hub, register))

add_entities(switches)
async_add_entities(switches)


class ModbusCoilSwitch(ToggleEntity, RestoreEntity):
"""Representation of a Modbus coil switch."""
class ModbusBaseSwitch(ToggleEntity, RestoreEntity, ABC):
"""Base class representing a Modbus switch."""

def __init__(self, hub, name, slave, coil):
"""Initialize the coil switch."""
self._hub = hub
self._name = name
self._slave = int(slave) if slave else None
self._coil = int(coil)
def __init__(self, hub: ModbusHub, config: Dict[str, Any]):
"""Initialize the switch."""
self._hub: ModbusHub = hub
self._name = config[CONF_NAME]
self._slave = config.get(CONF_SLAVE)
self._is_on = None
self._available = True

Expand All @@ -146,13 +129,24 @@ def available(self) -> bool:
"""Return True if entity is available."""
return self._available


class ModbusCoilSwitch(ModbusBaseSwitch, SwitchEntity):
"""Representation of a Modbus coil switch."""

def __init__(self, hub: ModbusHub, config: Dict[str, Any]):
"""Initialize the coil switch."""
super().__init__(hub, config)
self._coil = config[CALL_TYPE_COIL]

def turn_on(self, **kwargs):
"""Set switch on."""
self._write_coil(self._coil, True)
self._is_on = True

def turn_off(self, **kwargs):
"""Set switch off."""
self._write_coil(self._coil, False)
self._is_on = False

def update(self):
"""Update the state of the switch."""
Expand All @@ -172,7 +166,7 @@ def _read_coil(self, coil) -> bool:

self._available = True
# bits[0] select the lowest bit in result,
# is_on for a binary_sensor is true if the bit are 1
# is_on for a binary_sensor is true if the bit is 1
# The other bits are not considered.
return bool(result.bits[0] & 1)

Expand All @@ -187,46 +181,21 @@ def _write_coil(self, coil, value):
self._available = True


class ModbusRegisterSwitch(ModbusCoilSwitch):
class ModbusRegisterSwitch(ModbusBaseSwitch, SwitchEntity):
"""Representation of a Modbus register switch."""

# pylint: disable=super-init-not-called
Comment thread
vzahradnik marked this conversation as resolved.
def __init__(
self,
hub,
name,
slave,
register,
command_on,
command_off,
verify_state,
verify_register,
register_type,
state_on,
state_off,
):
def __init__(self, hub: ModbusHub, config: Dict[str, Any]):
"""Initialize the register switch."""
self._hub = hub
self._name = name
self._slave = slave
self._register = register
self._command_on = command_on
self._command_off = command_off
self._verify_state = verify_state
self._verify_register = verify_register if verify_register else self._register
self._register_type = register_type
super().__init__(hub, config)
self._register = config[CONF_REGISTER]
self._command_on = config[CONF_COMMAND_ON]
self._command_off = config[CONF_COMMAND_OFF]
self._state_on = config.get(CONF_STATE_ON, self._command_on)
self._state_off = config.get(CONF_STATE_OFF, self._command_off)
self._verify_state = config[CONF_VERIFY_STATE]
self._verify_register = config.get(CONF_VERIFY_REGISTER, self._register)
self._register_type = config[CONF_REGISTER_TYPE]
self._available = True

if state_on is not None:
self._state_on = state_on
else:
self._state_on = self._command_on

if state_off is not None:
self._state_off = state_off
else:
self._state_off = self._command_off

self._is_on = None

def turn_on(self, **kwargs):
Expand Down