-
-
Notifications
You must be signed in to change notification settings - Fork 37.5k
ZHA cover device support #30639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ZHA cover device support #30639
Changes from all commits
d64fc37
b5301a1
dc4f8ec
a655508
67428fd
9b2ec59
e3509fe
5860e8b
e748042
0567474
374148e
81cd67a
13fc42f
9663f9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| """Support for ZHA covers.""" | ||
| from datetime import timedelta | ||
| import functools | ||
| import logging | ||
|
|
||
| from zigpy.zcl.foundation import Status | ||
|
|
||
| from homeassistant.components.cover import ATTR_POSITION, DOMAIN, CoverDevice | ||
| from homeassistant.const import STATE_CLOSED, STATE_CLOSING, STATE_OPEN, STATE_OPENING | ||
| from homeassistant.core import callback | ||
| from homeassistant.helpers.dispatcher import async_dispatcher_connect | ||
|
|
||
| from .core.const import ( | ||
| CHANNEL_COVER, | ||
| DATA_ZHA, | ||
| DATA_ZHA_DISPATCHERS, | ||
| SIGNAL_ATTR_UPDATED, | ||
| ZHA_DISCOVERY_NEW, | ||
| ) | ||
| from .core.registries import ZHA_ENTITIES | ||
| from .entity import ZhaEntity | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
| SCAN_INTERVAL = timedelta(minutes=60) | ||
| STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, DOMAIN) | ||
|
|
||
|
|
||
| async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): | ||
| """Old way of setting up Zigbee Home Automation covers.""" | ||
| pass | ||
|
|
||
|
|
||
| async def async_setup_entry(hass, config_entry, async_add_entities): | ||
| """Set up the Zigbee Home Automation cover from config entry.""" | ||
|
|
||
| async def async_discover(discovery_info): | ||
| await _async_setup_entities( | ||
| hass, config_entry, async_add_entities, [discovery_info] | ||
| ) | ||
|
|
||
| unsub = async_dispatcher_connect( | ||
| hass, ZHA_DISCOVERY_NEW.format(DOMAIN), async_discover | ||
| ) | ||
| hass.data[DATA_ZHA][DATA_ZHA_DISPATCHERS].append(unsub) | ||
|
|
||
| covers = hass.data.get(DATA_ZHA, {}).get(DOMAIN) | ||
| if covers is not None: | ||
| await _async_setup_entities( | ||
| hass, config_entry, async_add_entities, covers.values() | ||
| ) | ||
| del hass.data[DATA_ZHA][DOMAIN] | ||
|
|
||
|
|
||
| async def _async_setup_entities( | ||
| hass, config_entry, async_add_entities, discovery_infos | ||
| ): | ||
| """Set up the ZHA covers.""" | ||
| entities = [] | ||
| for discovery_info in discovery_infos: | ||
| zha_dev = discovery_info["zha_device"] | ||
| channels = discovery_info["channels"] | ||
|
|
||
| entity = ZHA_ENTITIES.get_entity(DOMAIN, zha_dev, channels, ZhaCover) | ||
| if entity: | ||
| entities.append(entity(**discovery_info)) | ||
|
|
||
| if entities: | ||
| async_add_entities(entities, update_before_add=True) | ||
|
|
||
|
|
||
| @STRICT_MATCH(channel_names=CHANNEL_COVER) | ||
| class ZhaCover(ZhaEntity, CoverDevice): | ||
| """Representation of a ZHA cover.""" | ||
|
|
||
| def __init__(self, unique_id, zha_device, channels, **kwargs): | ||
| """Init this sensor.""" | ||
| super().__init__(unique_id, zha_device, channels, **kwargs) | ||
| self._cover_channel = self.cluster_channels.get(CHANNEL_COVER) | ||
| self._current_position = None | ||
|
|
||
| async def async_added_to_hass(self): | ||
| """Run when about to be added to hass.""" | ||
| await super().async_added_to_hass() | ||
| await self.async_accept_signal( | ||
| self._cover_channel, SIGNAL_ATTR_UPDATED, self.async_set_position | ||
| ) | ||
|
|
||
| @callback | ||
| def async_restore_last_state(self, last_state): | ||
| """Restore previous state.""" | ||
| self._state = last_state.state | ||
| if "current_position" in last_state.attributes: | ||
| self._current_position = last_state.attributes["current_position"] | ||
|
|
||
| @property | ||
| def is_closed(self): | ||
| """Return if the cover is closed.""" | ||
| if self.current_cover_position is None: | ||
| return None | ||
| return self.current_cover_position == 0 | ||
|
|
||
| @property | ||
| def current_cover_position(self): | ||
| """Return the current position of ZHA cover. | ||
|
|
||
| None is unknown, 0 is closed, 100 is fully open. | ||
| """ | ||
| return self._current_position | ||
|
|
||
| def async_set_position(self, pos): | ||
| """Handle position update from channel.""" | ||
| _LOGGER.debug("setting position: %s", pos) | ||
| self._current_position = 100 - pos | ||
| if self._current_position == 0: | ||
| self._state = STATE_CLOSED | ||
| elif self._current_position == 100: | ||
| self._state = STATE_OPEN | ||
| self.async_schedule_update_ha_state() | ||
|
|
||
| def async_set_state(self, state): | ||
| """Handle state update from channel.""" | ||
| _LOGGER.debug("state=%s", state) | ||
| self._state = state | ||
| self.async_schedule_update_ha_state() | ||
|
|
||
| async def async_open_cover(self, **kwargs): | ||
| """Open the window cover.""" | ||
| res = await self._cover_channel.up_open() | ||
| if isinstance(res, list) and res[1] is Status.SUCCESS: | ||
| self.async_set_state(STATE_OPENING) | ||
|
|
||
| async def async_close_cover(self, **kwargs): | ||
| """Close the window cover.""" | ||
| res = await self._cover_channel.down_close() | ||
| if isinstance(res, list) and res[1] is Status.SUCCESS: | ||
| self.async_set_state(STATE_CLOSING) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this info we can implement |
||
|
|
||
| async def async_set_cover_position(self, **kwargs): | ||
| """Move the roller shutter to a specific position.""" | ||
| new_pos = kwargs.get(ATTR_POSITION) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Position is required. new_pos = kwargs[ATTR_POSITION] |
||
| res = await self._cover_channel.go_to_lift_percentage(100 - new_pos) | ||
| if isinstance(res, list) and res[1] is Status.SUCCESS: | ||
| self.async_set_state( | ||
| STATE_CLOSING if new_pos < self._current_position else STATE_OPENING | ||
| ) | ||
|
|
||
| async def async_stop_cover(self, **kwargs): | ||
| """Stop the window cover.""" | ||
| res = await self._cover_channel.stop() | ||
| if isinstance(res, list) and res[1] is Status.SUCCESS: | ||
| self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
| self.async_schedule_update_ha_state() | ||
|
|
||
| async def async_update(self): | ||
| """Attempt to retrieve the open/close state of the cover.""" | ||
| await super().async_update() | ||
| await self.async_get_state() | ||
|
|
||
| async def async_get_state(self, from_cache=True): | ||
| """Fetch the current state.""" | ||
| _LOGGER.debug("polling current state") | ||
| if self._cover_channel: | ||
| pos = await self._cover_channel.get_attribute_value( | ||
| "current_position_lift_percentage", from_cache=from_cache | ||
| ) | ||
| _LOGGER.debug("read pos=%s", pos) | ||
|
|
||
| if pos is not None: | ||
| self._current_position = 100 - pos | ||
| self._state = ( | ||
| STATE_OPEN if self.current_cover_position > 0 else STATE_CLOSED | ||
| ) | ||
| else: | ||
| self._current_position = None | ||
| self._state = None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| """Test zha cover.""" | ||
| from unittest.mock import MagicMock, call, patch | ||
|
|
||
| import zigpy.types | ||
| import zigpy.zcl.clusters.closures as closures | ||
| import zigpy.zcl.clusters.general as general | ||
| import zigpy.zcl.foundation as zcl_f | ||
|
|
||
| from homeassistant.components.cover import DOMAIN | ||
| from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE | ||
|
|
||
| from .common import ( | ||
| async_enable_traffic, | ||
| async_init_zigpy_device, | ||
| async_test_device_join, | ||
| find_entity_id, | ||
| make_attribute, | ||
| make_zcl_header, | ||
| ) | ||
|
|
||
| from tests.common import mock_coro | ||
|
|
||
|
|
||
| async def test_cover(hass, config_entry, zha_gateway): | ||
| """Test zha cover platform.""" | ||
|
|
||
| # create zigpy device | ||
| zigpy_device = await async_init_zigpy_device( | ||
| hass, | ||
| [closures.WindowCovering.cluster_id, general.Basic.cluster_id], | ||
| [], | ||
| None, | ||
| zha_gateway, | ||
| ) | ||
|
|
||
| async def get_chan_attr(*args, **kwargs): | ||
| return 100 | ||
|
|
||
| with patch( | ||
| "homeassistant.components.zha.core.channels.ZigbeeChannel.get_attribute_value", | ||
| new=MagicMock(side_effect=get_chan_attr), | ||
| ) as get_attr_mock: | ||
| # load up cover domain | ||
| await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) | ||
| await hass.async_block_till_done() | ||
| assert get_attr_mock.call_count == 2 | ||
| assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage" | ||
|
|
||
| cluster = zigpy_device.endpoints.get(1).window_covering | ||
| zha_device = zha_gateway.get_device(zigpy_device.ieee) | ||
| entity_id = await find_entity_id(DOMAIN, zha_device, hass) | ||
| assert entity_id is not None | ||
|
|
||
| # test that the cover was created and that it is unavailable | ||
| assert hass.states.get(entity_id).state == STATE_UNAVAILABLE | ||
|
|
||
| # allow traffic to flow through the gateway and device | ||
| await async_enable_traffic(hass, zha_gateway, [zha_device]) | ||
| await hass.async_block_till_done() | ||
|
|
||
| attr = make_attribute(8, 100) | ||
| hdr = make_zcl_header(zcl_f.Command.Report_Attributes) | ||
| cluster.handle_message(hdr, [[attr]]) | ||
| await hass.async_block_till_done() | ||
|
|
||
| # test that the state has changed from unavailable to off | ||
| assert hass.states.get(entity_id).state == STATE_CLOSED | ||
|
|
||
| # test to see if it opens | ||
| attr = make_attribute(8, 0) | ||
| hdr = make_zcl_header(zcl_f.Command.Report_Attributes) | ||
| cluster.handle_message(hdr, [[attr]]) | ||
| await hass.async_block_till_done() | ||
| assert hass.states.get(entity_id).state == STATE_OPEN | ||
|
|
||
| # close from UI | ||
| with patch( | ||
| "zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS]) | ||
| ): | ||
| await hass.services.async_call( | ||
| DOMAIN, "close_cover", {"entity_id": entity_id}, blocking=True | ||
| ) | ||
| assert cluster.request.call_count == 1 | ||
| assert cluster.request.call_args == call( | ||
| False, 0x1, (), expect_reply=True, manufacturer=None | ||
| ) | ||
|
|
||
| # open from UI | ||
| with patch( | ||
| "zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS]) | ||
| ): | ||
| await hass.services.async_call( | ||
| DOMAIN, "open_cover", {"entity_id": entity_id}, blocking=True | ||
| ) | ||
| assert cluster.request.call_count == 1 | ||
| assert cluster.request.call_args == call( | ||
| False, 0x0, (), expect_reply=True, manufacturer=None | ||
| ) | ||
|
|
||
| # set position UI | ||
| with patch( | ||
| "zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS]) | ||
| ): | ||
| await hass.services.async_call( | ||
| DOMAIN, | ||
| "set_cover_position", | ||
| {"entity_id": entity_id, "position": 47}, | ||
| blocking=True, | ||
| ) | ||
| assert cluster.request.call_count == 1 | ||
| assert cluster.request.call_args == call( | ||
| False, 0x5, (zigpy.types.uint8_t,), 53, expect_reply=True, manufacturer=None | ||
| ) | ||
|
|
||
| # stop from UI | ||
| with patch( | ||
| "zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS]) | ||
| ): | ||
| await hass.services.async_call( | ||
| DOMAIN, "stop_cover", {"entity_id": entity_id}, blocking=True | ||
| ) | ||
| assert cluster.request.call_count == 1 | ||
| assert cluster.request.call_args == call( | ||
| False, 0x2, (), expect_reply=True, manufacturer=None | ||
| ) | ||
|
|
||
| await async_test_device_join( | ||
| hass, zha_gateway, closures.WindowCovering.cluster_id, entity_id | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this info we can implement
is_openingcover entity property.https://github.com/home-assistant/home-assistant/blob/4dc39492a57c505c1915e679f06198204458763c/homeassistant/components/cover/__init__.py#L220-L223