Skip to content

Commit

Permalink
Fixed linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
austina-csa committed Dec 5, 2024
1 parent fa4f645 commit 5c4c6e4
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions src/python_testing/TC_IDM_2_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,12 @@ async def test_TC_IDM_2_2(self):

for endpoint in read_request:
for cluster in read_request[endpoint]:
if endpoint != 1 or cluster != Clusters.ModeSelect: # Endpoint 1 seems an issue with ModeSelect (ServerList has an extra 4293984257)
returned_attrs = sorted([x.attribute_id for x in read_request[endpoint][cluster].keys() if x != Clusters.Attribute.DataVersion])
attr_list = sorted([x for x in read_request[endpoint][cluster][cluster.Attributes.AttributeList] if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id])
# Endpoint 1 seems an issue with ModeSelect (ServerList has an extra 4293984257)
if endpoint != 1 or cluster != Clusters.ModeSelect:
returned_attrs = sorted([x.attribute_id for x in read_request[endpoint]
[cluster].keys() if x != Clusters.Attribute.DataVersion])
attr_list = sorted([x for x in read_request[endpoint][cluster][cluster.Attributes.AttributeList]
if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id])
asserts.assert_equal(returned_attrs, attr_list, f"Mismatch for {cluster} at endpoint {endpoint}")

# Step 6
Expand Down Expand Up @@ -299,14 +302,14 @@ async def test_TC_IDM_2_2(self):

asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output")
asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList,
read_request[0][Clusters.Objects.Descriptor], "ServerList not in output")
read_request[0][Clusters.Objects.Descriptor], "ServerList not in output")

for cluster in read_request[0]:
attribute_ids = [a.attribute_id for a in read_request[0]
[cluster].keys() if a != Clusters.Attribute.DataVersion]
[cluster].keys() if a != Clusters.Attribute.DataVersion]
asserts.assert_equal(sorted(attribute_ids),
sorted(read_request[0][cluster][cluster.Attributes.AttributeList]),
"Expected attribute list does not match actual list for cluster {cluster} on endpoint 0")
sorted(read_request[0][cluster][cluster.Attributes.AttributeList]),
"Expected attribute list does not match actual list for cluster {cluster} on endpoint 0")
self.verify_attribute_list_cluster(read_request, 0, Clusters.Descriptor)

# Step 9
Expand Down Expand Up @@ -407,7 +410,8 @@ async def test_TC_IDM_2_2(self):
all_clusters = set(list(ClusterObjects.ALL_CLUSTERS.keys()))

for endpoint_id, endpoint in self.endpoints.items():
dut_standard_clusters = set([x.id for x in endpoint.keys() if global_attribute_ids.cluster_id_type(x.id) == global_attribute_ids.ClusterIdType.kStandard])
dut_standard_clusters = set([x.id for x in endpoint.keys() if global_attribute_ids.cluster_id_type(
x.id) == global_attribute_ids.ClusterIdType.kStandard])
unsupported = [id for id in list(all_clusters - dut_standard_clusters) if global_attribute_ids.attribute_id_type(id)
== global_attribute_ids.AttributeIdType.kStandardNonGlobal]
if unsupported:
Expand Down Expand Up @@ -569,15 +573,15 @@ async def test_TC_IDM_2_2(self):
EndpointId=0,
ClusterId=None,
AttributeId=Clusters.Descriptor.Attributes.ServerList.attribute_id)
# AttributeId=global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID)
# AttributeId=global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID)
try:
read_request = await self.default_controller.ReadAttribute(
self.dut_node_id,
[attribute_path]
)
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580,
"Incorrect error response for reading non-global attribute on all clusters at endpoint 0")
"Incorrect error response for reading non-global attribute on all clusters at endpoint 0")

# Step 30

Expand All @@ -586,11 +590,11 @@ async def test_TC_IDM_2_2(self):

# On the TH verify that the DUT sends an error message and not the value of the attribute.
self.print_step(30, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints")

endpoint = 0
read_request = await self.default_controller.Read(self.dut_node_id,
[(endpoint, Clusters.AccessControl.Attributes.Acl)],
)
[(endpoint, Clusters.AccessControl.Attributes.Acl)],
)

dut_acl_original = read_request.attributes[endpoint][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]

Expand All @@ -606,19 +610,22 @@ async def test_TC_IDM_2_2(self):

result = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, Clusters.AccessControl.Attributes.Acl(dut_acl))])
read_request = await self.default_controller.Read(self.dut_node_id,
[(endpoint, Clusters.AccessControl.Attributes.Acl)],
)
[(endpoint, Clusters.AccessControl.Attributes.Acl)],
)

asserts.assert_equal(len(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]), 2)

asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable)
asserts.assert_not_equal(type(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][1].targets), Clusters.Types.Nullable)
asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl]
[Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable)
asserts.assert_not_equal(type(read_request.attributes[0][Clusters.AccessControl]
[Clusters.AccessControl.Attributes.Acl][1].targets), Clusters.Types.Nullable)
asserts.assert_equal(len(read_request.tlvAttributes[0][31][0]), 2)

# attribute_path = AttributePath(
# EndpointId=0,
# ClusterId=None,
# AttributeId=Clusters.Descriptor.Attributes.ServerList.attribute_id)

# read_request = await self.default_controller.ReadAttribute(
# self.dut_node_id,
# [attribute_path]
Expand All @@ -627,12 +634,13 @@ async def test_TC_IDM_2_2(self):
# Cleanup
await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, Clusters.AccessControl.Attributes.Acl(dut_acl_original))])
read_request = await self.default_controller.Read(self.dut_node_id,
[(endpoint, Clusters.AccessControl.Attributes.Acl)],
)
[(endpoint, Clusters.AccessControl.Attributes.Acl)],
)
asserts.assert_equal(len(read_request.tlvAttributes[0][31][0]), 1)
asserts.assert_equal(len(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]), 1)
asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable)

asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl]
[Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable)

# Step 31

# TH should have access to only a single cluster at one Endpoint1.
Expand Down

0 comments on commit 5c4c6e4

Please sign in to comment.