|
1 | 1 | #
|
2 |
| -# Copyright (c) 2023 Project CHIP Authors |
| 2 | +# Copyright (c) 2024 Project CHIP Authors |
3 | 3 | # All rights reserved.
|
4 | 4 | #
|
5 | 5 | # Licensed under the Apache License, Version 2.0 (the "License");
|
|
30 | 30 | import logging
|
31 | 31 | import queue
|
32 | 32 | import time
|
| 33 | +from datetime import datetime, timedelta |
33 | 34 | from typing import Any
|
34 | 35 |
|
35 | 36 | import chip.clusters as Clusters
|
| 37 | +import test_plan_support |
36 | 38 | from chip.clusters import ClusterObjects as ClusterObjects
|
37 | 39 | from chip.clusters.Attribute import EventReadResult, TypedAttributePath
|
38 | 40 | from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest,
|
39 |
| - async_test_body, default_matter_test_main) |
| 41 | + TestStep, async_test_body, default_matter_test_main) |
40 | 42 | from mobly import asserts
|
41 | 43 |
|
42 | 44 | logger = logging.getLogger(__name__)
|
@@ -93,6 +95,25 @@ def _ask_for_long_press(self, endpoint_id: int, pressed_position: int):
|
93 | 95 | "ButtonId": pressed_position, "LongPressDelayMillis": 5000, "LongPressDurationMillis": 5500}
|
94 | 96 | self._send_named_pipe_command(command_dict)
|
95 | 97 |
|
| 98 | + def _ask_for_keep_pressed(self, endpoint_id: int, pressed_position: int): |
| 99 | + if not self._use_button_simulator(): |
| 100 | + self.wait_for_user_input( |
| 101 | + prompt_msg=f"Press switch position {pressed_position} for a long time (around 5 seconds) on the DUT, then release it.") |
| 102 | + else: |
| 103 | + # Using the long press here with a long duration so we can check the intermediate value. |
| 104 | + command_dict = {"Name": "SimulateActionSwitchLongPress", "EndpointId": endpoint_id, |
| 105 | + "ButtonId": pressed_position, "LongPressDelayMillis": 0, "LongPressDurationMillis": self.keep_pressed_delay} |
| 106 | + self._send_named_pipe_command(command_dict) |
| 107 | + |
| 108 | + def _ask_for_release(self): |
| 109 | + # Since we used a long press for this, "ask for release" on the button simulator just means waiting out the delay |
| 110 | + if not self._use_button_simulator(): |
| 111 | + self.wait_for_user_input( |
| 112 | + prompt_msg="Release the button." |
| 113 | + ) |
| 114 | + else: |
| 115 | + time.sleep(self.keep_pressed_delay/1000) |
| 116 | + |
96 | 117 | def _placeholder_for_step(self, step_id: str):
|
97 | 118 | # TODO: Global search an replace of `self._placeholder_for_step` with `self.step` when done.
|
98 | 119 | logging.info(f"Step {step_id}")
|
@@ -285,6 +306,96 @@ async def test_TC_SWTCH_2_4(self):
|
285 | 306 | self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
|
286 | 307 | sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)
|
287 | 308 |
|
| 309 | + def _received_event(self, event_listener: EventChangeCallback, target_event: ClusterObjects.ClusterEvent, timeout_s: int) -> bool: |
| 310 | + """ |
| 311 | + Returns true if this event was received, false otherwise |
| 312 | + """ |
| 313 | + remaining = timedelta(seconds=timeout_s) |
| 314 | + end_time = datetime.now() + remaining |
| 315 | + while (remaining.seconds > 0): |
| 316 | + try: |
| 317 | + event = event_listener.event_queue.get(timeout=remaining.seconds) |
| 318 | + except queue.Empty: |
| 319 | + return False |
| 320 | + |
| 321 | + if event.Header.EventId == target_event.event_id: |
| 322 | + return True |
| 323 | + remaining = end_time - datetime.now() |
| 324 | + return False |
| 325 | + |
| 326 | + def pics_TC_SWTCH_2_3(self): |
| 327 | + return ['SWTCH.S.F01'] |
| 328 | + |
| 329 | + def steps_TC_SWTCH_2_3(self): |
| 330 | + return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), |
| 331 | + TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), |
| 332 | + TestStep(3, "Operator does not operate switch on the DUT"), |
| 333 | + TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), |
| 334 | + TestStep(5, "Operator operates switch (keep it pressed)", |
| 335 | + "Verify that the TH receives InitialPress event with NewPosition set to 1 on the DUT"), |
| 336 | + TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"), |
| 337 | + TestStep(7, "Operator releases switch on the DUT"), |
| 338 | + TestStep("8a", "If the DUT implements the MSR feature, verify that the TH receives ShortRelease event with NewPosition set to 0 on the DUT", "Event received"), |
| 339 | + TestStep( |
| 340 | + "8b", "If the DUT implements the AS feature, verify that the TH does not receive ShortRelease event on the DUT", "No event received"), |
| 341 | + TestStep(9, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), |
| 342 | + ] |
| 343 | + |
| 344 | + @async_test_body |
| 345 | + async def test_TC_SWTCH_2_3(self): |
| 346 | + # Commissioning - already done |
| 347 | + self.step(1) |
| 348 | + cluster = Clusters.Switch |
| 349 | + feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) |
| 350 | + |
| 351 | + has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 |
| 352 | + has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 |
| 353 | + |
| 354 | + endpoint_id = self.matter_test_config.endpoint |
| 355 | + |
| 356 | + self.step(2) |
| 357 | + event_listener = EventChangeCallback(cluster) |
| 358 | + await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) |
| 359 | + |
| 360 | + self.step(3) |
| 361 | + self._ask_for_switch_idle() |
| 362 | + |
| 363 | + self.step(4) |
| 364 | + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) |
| 365 | + asserts.assert_equal(button_val, 0, "Button value is not 0") |
| 366 | + |
| 367 | + self.step(5) |
| 368 | + # We're using a long press here with a very long duration (in computer-land). This will let us check the intermediate values. |
| 369 | + # This is 1s larger than the subscription ceiling |
| 370 | + self.keep_pressed_delay = 6000 |
| 371 | + self.pressed_position = 1 |
| 372 | + self._ask_for_keep_pressed(endpoint_id, self.pressed_position) |
| 373 | + event_listener.wait_for_event_report(cluster.Events.InitialPress) |
| 374 | + |
| 375 | + self.step(6) |
| 376 | + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) |
| 377 | + asserts.assert_equal(button_val, self.pressed_position, f"Button value is not {self.pressed_position}") |
| 378 | + |
| 379 | + self.step(7) |
| 380 | + self._ask_for_release() |
| 381 | + |
| 382 | + self.step("8a") |
| 383 | + if has_msr_feature: |
| 384 | + asserts.assert_true(self._received_event(event_listener, cluster.Events.ShortRelease, 10), |
| 385 | + "Did not receive short release") |
| 386 | + else: |
| 387 | + self.mark_current_step_skipped() |
| 388 | + |
| 389 | + self.step("8b") |
| 390 | + if has_as_feature: |
| 391 | + asserts.assert_false(self._received_event(event_listener, cluster.Events.ShortRelease, 10), "Received short release") |
| 392 | + else: |
| 393 | + self.mark_current_step_skipped() |
| 394 | + |
| 395 | + self.step(9) |
| 396 | + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) |
| 397 | + asserts.assert_equal(button_val, 0, "Button value is not 0") |
| 398 | + |
288 | 399 |
|
289 | 400 | if __name__ == "__main__":
|
290 | 401 | default_matter_test_main()
|
0 commit comments