Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 55 additions & 162 deletions homeassistant/components/binary_sensor/zha.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
0x002d: 'vibration',
}

DEVICE_CLASS_OPENING = 'opening'


async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
Expand All @@ -37,8 +39,8 @@ async def async_setup_platform(hass, config, async_add_devices,
await _async_setup_iaszone(hass, config, async_add_devices,
discovery_info)
elif OnOff.cluster_id in discovery_info['out_clusters']:
await _async_setup_remote(hass, config, async_add_devices,
discovery_info)
await _async_setup_onoff(hass, config, async_add_devices,
discovery_info, DEVICE_CLASS_OPENING)


async def _async_setup_iaszone(hass, config, async_add_devices,
Expand All @@ -58,47 +60,57 @@ async def _async_setup_iaszone(hass, config, async_add_devices,
# If we fail to read from the device, use a non-specific class
pass

sensor = BinarySensor(device_class, **discovery_info)
async_add_devices([sensor], update_before_add=True)

sensor = IasZoneSensor(device_class, **discovery_info)
async_add_devices([sensor])

async def _async_setup_remote(hass, config, async_add_devices, discovery_info):

async def safe(coro):
"""Run coro, catching ZigBee delivery errors, and ignoring them."""
import zigpy.exceptions
try:
await coro
except zigpy.exceptions.DeliveryError as exc:
_LOGGER.warning("Ignoring error during setup: %s", exc)
async def _async_setup_onoff(hass, config, async_add_devices,
discovery_info, device_class):
sensor = BinarySensor(device_class, **discovery_info)

if discovery_info['new_join']:
from zigpy.zcl.clusters.general import OnOff, LevelControl
out_clusters = discovery_info['out_clusters']
if OnOff.cluster_id in out_clusters:
cluster = out_clusters[OnOff.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 0, 600, 1))
if LevelControl.cluster_id in out_clusters:
cluster = out_clusters[LevelControl.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 1, 600, 1))
from zigpy.exceptions import ZigbeeException
from zigpy.zcl.clusters.general import OnOff
in_clusters = discovery_info['in_clusters']
endpoint = discovery_info['endpoint']
cluster = in_clusters[OnOff.cluster_id]
attr, min_report, max_report, report_change = [0, 0, 600, 1]
try:
await cluster.bind()
except ZigbeeException as ex:
_LOGGER.debug("Failed to bind {}-{}-{}: {}".
format(endpoint.device.ieee, endpoint.endpoint_id,
cluster.cluster_id, ex))
try:
await cluster.configure_reporting(attr, min_report,
max_report, report_change)
except ZigbeeException as ex:
_LOGGER.debug(
"Failed to configure reporting for attr {} on {}-{}-{}: {}"
.format(attr, endpoint.device.ieee, endpoint.endpoint_id,
cluster.cluster_id, ex))

sensor = Switch(**discovery_info)
async_add_devices([sensor], update_before_add=True)
async_add_devices([sensor])


class BinarySensor(zha.Entity, BinarySensorDevice):
"""The ZHA Binary Sensor."""
"""ZHA Binary Sensor."""

_domain = DOMAIN
_device_class = None
value_attribute = 0

def __init__(self, device_class, **kwargs):
"""Initialize the ZHA binary sensor."""
super().__init__(**kwargs)
self._device_class = device_class
from zigpy.zcl.clusters.security import IasZone
self._ias_zone_cluster = self._in_clusters[IasZone.cluster_id]

def attribute_updated(self, attribute, value):
"""Handle attribute update from device."""
_LOGGER.debug("Attribute updated: %s %s %s", self, attribute, value)
if attribute == self.value_attribute:
self._state = bool(value)
self.async_schedule_update_ha_state()

@property
def should_poll(self) -> bool:
Expand All @@ -107,152 +119,33 @@ def should_poll(self) -> bool:

@property
def is_on(self) -> bool:
"""Return True if entity is on."""
"""Return if the switch is on based on the statemachine."""
if self._state is None:
return False
return bool(self._state)
return self._state

@property
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
def device_class(self) -> str:
"""Return device class from component DEVICE_CLASSES."""
return self._device_class


class IasZoneSensor(BinarySensor, BinarySensorDevice):
"""The ZHA Binary Sensor."""

def __init__(self, device_class, **kwargs):
"""Initialize the ZHA binary sensor."""
super().__init__(device_class, **kwargs)
from zigpy.zcl.clusters.security import IasZone
self._ias_zone_cluster = self._in_clusters[IasZone.cluster_id]

def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id == 0:
self._state = args[0] & 3
self._state = bool(args[0] & 3)
_LOGGER.debug("Updated alarm state: %s", self._state)
self.async_schedule_update_ha_state()
elif command_id == 1:
_LOGGER.debug("Enroll requested")
res = self._ias_zone_cluster.enroll_response(0, 0)
self.hass.async_add_job(res)

async def async_update(self):
"""Retrieve latest state."""
from bellows.types.basic import uint16_t

result = await zha.safe_read(self._endpoint.ias_zone,
['zone_status'],
allow_cache=False)
state = result.get('zone_status', self._state)
if isinstance(state, (int, uint16_t)):
self._state = result.get('zone_status', self._state) & 3


class Switch(zha.Entity, BinarySensorDevice):
"""ZHA switch/remote controller/button."""

_domain = DOMAIN

class OnOffListener:
"""Listener for the OnOff ZigBee cluster."""

def __init__(self, entity):
"""Initialize OnOffListener."""
self._entity = entity

def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id in (0x0000, 0x0040):
self._entity.set_state(False)
elif command_id in (0x0001, 0x0041, 0x0042):
self._entity.set_state(True)
elif command_id == 0x0002:
self._entity.set_state(not self._entity.is_on)

def attribute_updated(self, attrid, value):
"""Handle attribute updates on this cluster."""
if attrid == 0:
self._entity.set_state(value)

def zdo_command(self, *args, **kwargs):
"""Handle ZDO commands on this cluster."""
pass

class LevelListener:
"""Listener for the LevelControl ZigBee cluster."""

def __init__(self, entity):
"""Initialize LevelListener."""
self._entity = entity

def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id in (0x0000, 0x0004): # move_to_level, -with_on_off
self._entity.set_level(args[0])
elif command_id in (0x0001, 0x0005): # move, -with_on_off
# We should dim slowly -- for now, just step once
rate = args[1]
if args[0] == 0xff:
rate = 10 # Should read default move rate
self._entity.move_level(-rate if args[0] else rate)
elif command_id in (0x0002, 0x0006): # step, -with_on_off
# Step (technically may change on/off)
self._entity.move_level(-args[1] if args[0] else args[1])

def attribute_update(self, attrid, value):
"""Handle attribute updates on this cluster."""
if attrid == 0:
self._entity.set_level(value)

def zdo_command(self, *args, **kwargs):
"""Handle ZDO commands on this cluster."""
pass

def __init__(self, **kwargs):
"""Initialize Switch."""
super().__init__(**kwargs)
self._state = False
self._level = 0
from zigpy.zcl.clusters import general
self._out_listeners = {
general.OnOff.cluster_id: self.OnOffListener(self),
general.LevelControl.cluster_id: self.LevelListener(self),
}

@property
def should_poll(self) -> bool:
"""Let zha handle polling."""
return False

@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self._state

@property
def device_state_attributes(self):
"""Return the device state attributes."""
self._device_state_attributes.update({
'level': self._state and self._level or 0
})
return self._device_state_attributes

def move_level(self, change):
"""Increment the level, setting state if appropriate."""
if not self._state and change > 0:
self._level = 0
self._level = min(255, max(0, self._level + change))
self._state = bool(self._level)
self.async_schedule_update_ha_state()

def set_level(self, level):
"""Set the level, setting state if appropriate."""
self._level = level
self._state = bool(self._level)
self.async_schedule_update_ha_state()

def set_state(self, state):
"""Set the state."""
self._state = state
if self._level == 0:
self._level = 255
self.async_schedule_update_ha_state()

async def async_update(self):
"""Retrieve latest state."""
from zigpy.zcl.clusters.general import OnOff
result = await zha.safe_read(
self._endpoint.out_clusters[OnOff.cluster_id], ['on_off'])
self._state = result.get('on_off', self._state)
Loading