Skip to content

Commit

Permalink
[Mbed] Add to flake8 in workflow and fix python files (part #25193) (#…
Browse files Browse the repository at this point in the history
…25249)

* [Mbed] Add to flake8 in workflow and fix python files

* Fix after review
  • Loading branch information
DamMicSzm authored and pull[bot] committed Jan 29, 2024
1 parent 9c93d87 commit 4535176
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 131 deletions.
2 changes: 0 additions & 2 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ exclude = third_party
# temporarily scan only directories with fixed files
# TODO: Remove the paths below when all bugs are fixed
src/tools/chip-cert/*
src/test_driver/mbed/*
build/chip/java/tests/*
build/chip/linux/*
build/config/linux/*
credentials/fetch-paa-certs-from-dcl.py
docs/_extensions/external_content.py
examples/common/pigweed/rpc_console/py/chip_rpc/console.py
examples/lighting-app/python/lighting.py
examples/platform/mbed/ota/generate_ota_list_image.py
scripts/build/build/target.py
scripts/build/build/targets.py
scripts/build/builders/android.py
Expand Down
1 change: 0 additions & 1 deletion examples/platform/mbed/ota/generate_ota_list_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import json
import os
import string
import sys

FILE_NAME = "ota-image-list.json"
Expand Down
1 change: 0 additions & 1 deletion src/test_driver/mbed/integration_tests/common/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def wait_for_output(self, search: str, timeout: float = 10, assert_timeout: bool
if line:
lines.append(line)
if search in line:
end = time()
return lines

except queue.Empty:
Expand Down
2 changes: 1 addition & 1 deletion src/test_driver/mbed/integration_tests/common/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from time import sleep
from typing import Any, List, Mapping, Optional
from typing import Any, List, Mapping

import mbed_lstools
import pytest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib
import os
from pathlib import Path

from pw_hdlc.rpc import HdlcRpcClient, default_channels


Expand Down
31 changes: 18 additions & 13 deletions src/test_driver/mbed/integration_tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@


import logging
import os
import platform
import random
import re
Expand Down Expand Up @@ -48,7 +47,7 @@ def get_device_details(device):
:return: device details dictionary or None
"""
ret = device.wait_for_output("SetupQRCode")
if ret == None or len(ret) < 2:
if ret is None or len(ret) < 2:
return None

qr_code = re.sub(
Expand Down Expand Up @@ -119,13 +118,13 @@ def send_zcl_command(devCtrl, line):
raise exceptions.UnknownCluster(args[0])
command = all_commands.get(args[0]).get(args[1], None)
# When command takes no arguments, (not command) is True
if command == None:
if command is None:
raise exceptions.UnknownCommand(args[0], args[1])
err, res = devCtrl.ZCLSend(args[0], args[1], int(
args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True)
if err != 0:
log.error("Failed to send ZCL command [{}] {}.".format(err, res))
elif res != None:
elif res is not None:
log.info("Success, received command response:")
log.info(res)
else:
Expand All @@ -145,7 +144,7 @@ def send_zcl_command(devCtrl, line):
def scan_chip_ble_devices(devCtrl):
"""
BLE scan CHIP device
BLE scanning for 10 seconds and collect the results
BLE scanning for 10 seconds and collect the results
:param devCtrl: device controller instance
:return: List of visible BLE devices
"""
Expand Down Expand Up @@ -173,14 +172,14 @@ def check_chip_ble_devices_advertising(devCtrl, name, deviceDetails=None):
:return: True if device advertise else False
"""
ble_chip_device = scan_chip_ble_devices(devCtrl)
if ble_chip_device == None or len(ble_chip_device) == 0:
if ble_chip_device is None or len(ble_chip_device) == 0:
log.info("No BLE CHIP device found")
return False

chip_device_found = False

for ble_device in ble_chip_device:
if deviceDetails != None:
if deviceDetails is not None:
if (ble_device["name"] == name and
int(ble_device["discriminator"]) == int(deviceDetails["Discriminator"]) and
int(ble_device["vendorId"]) == int(deviceDetails["VendorID"]) and
Expand All @@ -197,14 +196,14 @@ def check_chip_ble_devices_advertising(devCtrl, name, deviceDetails=None):

def connect_device_over_ble(devCtrl, discriminator, pinCode, nodeId=None):
"""
Connect to Matter accessory device over BLE
Connect to Matter accessory device over BLE
:param devCtrl: device controller instance
:param discriminator: CHIP device discriminator
:param pinCode: CHIP device pin code
:param nodeId: default value of node ID
:return: node ID is provisioning successful, otherwise None
"""
if nodeId == None:
if nodeId is None:
nodeId = random.randint(1, 1000000)

try:
Expand Down Expand Up @@ -248,7 +247,7 @@ def close_ble(devCtrl):

def commissioning_wifi(devCtrl, ssid, password, nodeId):
"""
Commissioning a Wi-Fi device
Commissioning a Wi-Fi device
:param devCtrl: device controller instance
:param ssid: network ssid
:param password: network password
Expand All @@ -257,8 +256,14 @@ def commissioning_wifi(devCtrl, ssid, password, nodeId):
"""

# Inject the credentials to the device
err, res = send_zcl_command(
devCtrl, "NetworkCommissioning AddOrUpdateWiFiNetwork {} 0 0 ssid=str:{} credentials=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid, password))
err, res = send_zcl_command(devCtrl,
"NetworkCommissioning "
"AddOrUpdateWiFiNetwork {} 0 0 "
"ssid=str:{} credentials=str:{} breadcrumb=0 timeoutMs=1000".format(
nodeId,
ssid,
password
))
if err != 0 and res["Status"] != 0:
log.error("Set Wi-Fi credentials failed [{}]".format(err))
return err
Expand All @@ -284,7 +289,7 @@ def resolve_device(devCtrl, nodeId):
try:
devCtrl.ResolveNode(int(nodeId))
ret = devCtrl.GetAddressAndPort(int(nodeId))
if ret == None:
if ret is None:
log.error("Get address and port failed")
except exceptions.ChipStackException as ex:
log.error("Resolve node failed {}".format(str(ex)))
Expand Down
2 changes: 0 additions & 2 deletions src/test_driver/mbed/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

pytest_plugins = ['common.fixtures']


Expand Down
71 changes: 36 additions & 35 deletions src/test_driver/mbed/integration_tests/lighting-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from button_service import button_service_pb2
from chip import ChipDeviceCtrl
from common.pigweed_client import PigweedClient
from common.utils import *
from common.utils import (check_chip_ble_devices_advertising, close_ble, close_connection, commissioning_wifi,
connect_device_over_ble, get_device_details, resolve_device, send_zcl_command)
from device_service import device_service_pb2
from lighting_service import lighting_service_pb2
from pw_status import Status
Expand All @@ -37,9 +38,9 @@
def test_smoke_test(device):
device.reset(duration=1)
ret = device.wait_for_output("Mbed lighting-app example application start")
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0
ret = device.wait_for_output("Mbed lighting-app example application run")
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0


def test_wifi_provisioning(device, network):
Expand All @@ -49,35 +50,34 @@ def test_wifi_provisioning(device, network):
devCtrl = ChipDeviceCtrl.ChipDeviceController()

device_details = get_device_details(device)
assert device_details != None and len(device_details) != 0
assert device_details is not None and len(device_details) != 0

assert check_chip_ble_devices_advertising(
devCtrl, BLE_DEVICE_NAME, device_details)

ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int(
device_details["SetUpPINCode"]), DEVICE_NODE_ID)
assert ret != None and ret == DEVICE_NODE_ID
assert ret is not None and ret == DEVICE_NODE_ID

ret = device.wait_for_output("Device completed Rendezvous process")
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

ret = commissioning_wifi(devCtrl, network_ssid,
network_pass, DEVICE_NODE_ID)
assert ret == 0

ret = device.wait_for_output("StationConnected")
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

ret = device.wait_for_output("address set")
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

device_ip_address = ret[-1].partition("address set:")[2].strip()

ret = resolve_device(devCtrl, DEVICE_NODE_ID)
assert ret != None and len(ret) == 2
assert ret is not None and len(ret) == 2

ip_address = ret[0]
port = ret[1]

assert device_ip_address == ip_address

Expand All @@ -92,53 +92,54 @@ def test_light_ctrl(device, network):
devCtrl = ChipDeviceCtrl.ChipDeviceController()

device_details = get_device_details(device)
assert device_details != None and len(device_details) != 0
assert device_details is not None and len(device_details) != 0

assert check_chip_ble_devices_advertising(
devCtrl, BLE_DEVICE_NAME, device_details)

ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int(
device_details["SetUpPINCode"]), DEVICE_NODE_ID)
assert ret != None and ret == DEVICE_NODE_ID
assert ret is not None and ret == DEVICE_NODE_ID

ret = device.wait_for_output("Device completed Rendezvous process")
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

ret = commissioning_wifi(devCtrl, network_ssid,
network_pass, DEVICE_NODE_ID)
assert ret == 0

ret = resolve_device(devCtrl, DEVICE_NODE_ID)
assert ret != None and len(ret) == 2
assert ret is not None and len(ret) == 2

err, res = send_zcl_command(
devCtrl, "OnOff On {} 1 0".format(DEVICE_NODE_ID))
assert err == 0

ret = device.wait_for_output("Turn On Action has been completed", 20)
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

err, res = send_zcl_command(
devCtrl, "OnOff Off {} 1 0".format(DEVICE_NODE_ID))
assert err == 0

ret = device.wait_for_output("Turn Off Action has been completed", 20)
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

err, res = send_zcl_command(
devCtrl, "OnOff Toggle {} 1 0".format(DEVICE_NODE_ID))
assert err == 0

ret = device.wait_for_output("Turn On Action has been completed", 20)
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

err, res = send_zcl_command(devCtrl, "LevelControl MoveToLevel {} 1 0 level={} transitionTime=1 optionMask=0 optionOverride=0".format(
DEVICE_NODE_ID, TEST_BRIGHTNESS_LEVEL))
err, res = send_zcl_command(devCtrl,
"LevelControl MoveToLevel {} 1 0 level={} transitionTime=1 optionMask=0 optionOverride=0".format(
DEVICE_NODE_ID, TEST_BRIGHTNESS_LEVEL))
assert err == 0

ret = device.wait_for_output(
"Setting brightness level to {}".format(TEST_BRIGHTNESS_LEVEL), 20)
assert ret != None and len(ret) > 0
assert ret is not None and len(ret) > 0

assert close_connection(devCtrl, DEVICE_NODE_ID)
assert close_ble(devCtrl)
Expand All @@ -147,11 +148,11 @@ def test_light_ctrl(device, network):
def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() == True
assert payload.vendor_id != None and payload.product_id != None and payload.serial_number != None
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details != None and len(device_details) != 0
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
Expand All @@ -163,7 +164,7 @@ def test_device_info_rpc(device):
def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() == True
assert status.ok() is True


def test_device_reboot_rpc(device):
Expand All @@ -183,39 +184,39 @@ def test_ligth_ctrl_rpc(device):

# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() == True
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() == True
assert payload.on == True
assert status.ok() is True
assert payload.on is True

# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() == True
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() == True
assert payload.on == False
assert status.ok() is True
assert payload.on is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() == True
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() == True
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() == True
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() == True
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() == True
assert status.ok() is True
assert payload.on == compare_state
Loading

0 comments on commit 4535176

Please sign in to comment.