Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions tests/components/zha/test_cover.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Test zha cover."""
from unittest.mock import call, patch
from unittest.mock import MagicMock, call, patch

import zigpy.types
import zigpy.zcl.clusters.closures as closures
Expand Down Expand Up @@ -33,9 +33,18 @@ async def test_cover(hass, config_entry, zha_gateway):
zha_gateway,
)

# load up cover domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
async def get_chan_attr(*args, **kwargs):
return 100

with patch(
"homeassistant.components.zha.core.channels.ZigbeeChannel.get_attribute_value",
new=MagicMock(side_effect=get_chan_attr),
) as get_attr_mock:
# load up cover domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
assert get_attr_mock.call_count == 2
assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage"

cluster = zigpy_device.endpoints.get(1).window_covering
zha_device = zha_gateway.get_device(zigpy_device.ieee)
Expand All @@ -47,6 +56,7 @@ async def test_cover(hass, config_entry, zha_gateway):

# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await hass.async_block_till_done()

attr = make_attribute(8, 100)
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
Expand All @@ -65,55 +75,51 @@ async def test_cover(hass, config_entry, zha_gateway):

# close from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x1, zcl_f.Status.SUCCESS]),
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "close_cover", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.request.mock_calls) == 1
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x1, (), expect_reply=True, manufacturer=None
)

# open from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x0, zcl_f.Status.SUCCESS]),
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "open_cover", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.request.mock_calls) == 1
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x0, (), expect_reply=True, manufacturer=None
)

# set position UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x5, zcl_f.Status.SUCCESS]),
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN,
"set_cover_position",
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert len(cluster.request.mock_calls) == 1
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x5, (zigpy.types.uint8_t,), 53, expect_reply=True, manufacturer=None
)

# stop from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x2, zcl_f.Status.SUCCESS]),
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "stop_cover", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.request.mock_calls) == 1
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x2, (), expect_reply=True, manufacturer=None
)
Expand Down