Skip to content

Commit

Permalink
Update TC_RR_1_1.py to include new group tests as well.
Browse files Browse the repository at this point in the history
  • Loading branch information
cletnick committed Jan 17, 2023
1 parent 3634df2 commit c2089bd
Showing 1 changed file with 227 additions and 0 deletions.
227 changes: 227 additions & 0 deletions src/python_testing/TC_RR_1_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

import asyncio
import logging
import math
import queue
import random
import time
from binascii import hexlify
from threading import Event
from typing import Any, Dict, List

import chip.CertificateAuthority
import chip.clusters as Clusters
import chip.FabricAdmin
from chip import ChipDeviceCtrl
from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath
from chip.interaction_model import Status as StatusEnum
from chip.utils import CommissioningBuildingBlocks
Expand Down Expand Up @@ -351,6 +354,59 @@ async def test_TC_RR_1_1(self):
else:
logging.info("Step 9: Skipped due to no UserLabel cluster instances")

# Step 10: Count all group cluster instances
# and ensure MaxGroupsPerFabric >= 4 * counted_groups_clusters.
logging.info("Step 10: Validating groups support minimums")
groups_cluster_endpoints: Dict[int, Any] = await dev_ctrl.ReadAttribute(self.dut_node_id, [Clusters.Groups])
counted_groups_clusters: int = len(groups_cluster_endpoints)

# The test for Step 10 and all of Steps 11 to 14 are only performed if Groups cluster instances are found.
if counted_groups_clusters > 0:
indicated_max_groups_per_fabric: int = await self.read_single_attribute(dev_ctrl,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupsPerFabric)
if indicated_max_groups_per_fabric < 4 * counted_groups_clusters:
asserts.fail(f"Failed Step 10: MaxGroupsPerFabric < 4 * counted_groups_clusters")

# Step 11: Confirm MaxGroupKeysPerFabric meets the minimum requirement of 3.
indicated_max_group_keys_per_fabric: int = await self.read_single_attribute(dev_ctrl,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupKeysPerFabric)
if indicated_max_group_keys_per_fabric < 3:
asserts.fail(f"Failed Step 11: MaxGroupKeysPerFabric < 3")

# Create a list of per-fabric clients to use for filling group resources accross all fabrics.
fabric_unique_clients: List[Any] = []
for fabric_idx in range(num_fabrics_to_commission):
fabric_number: int = fabric_idx + 1
# Client is client A for each fabric to set the Label field
client_name: str = "RD%dA" % fabric_number
fabric_unique_clients.append(client_by_name[client_name])

# Step 12: Write and verify indicated_max_group_keys_per_fabric group keys to all fabrics.
group_keys: List[Dict[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]] = await self.fill_and_validate_group_key_sets(
num_fabrics_to_commission, fabric_unique_clients, indicated_max_group_keys_per_fabric)

# Step 13: Write and verify indicated_max_groups_per_fabric group/key mappings for all fabrics.
# First, Generate list of unique group/key mappings
group_key_map: List[Dict[int, int]] = [[]]*num_fabrics_to_commission
for fabric_idx in range(num_fabrics_to_commission):
group_key_map[fabric_idx] = {}
for group_idx in range(indicated_max_groups_per_fabric):
group_id: int = fabric_idx * indicated_max_groups_per_fabric + group_idx + 1
group_key_idx: int = group_idx % len(group_keys[fabric_idx])
group_key_map[fabric_idx][group_id] = group_keys[fabric_idx][group_key_idx].groupKeySetID

group_key_map: List[Dict[int, int]] = await self.fill_and_validate_group_key_map(
num_fabrics_to_commission, fabric_unique_clients, group_key_map)

# Step 14: Add all the groups to the discovered groups-supporting endpoints and verify GroupTable
group_table_written: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]] = await self.add_all_groups(
num_fabrics_to_commission, fabric_unique_clients, group_key_map, groups_cluster_endpoints, indicated_max_groups_per_fabric)
await self.validate_group_table(num_fabrics_to_commission, fabric_unique_clients, group_table_written)

def random_string(self, length) -> str:
rnd = self._pseudo_random_generator
return "".join([rnd.choice("abcdef0123456789") for _ in range(length)])[:length]
Expand Down Expand Up @@ -378,6 +434,158 @@ async def fill_user_label_list(self, dev_ctrl, target_node_id):

asserts.assert_equal(read_back_labels, labels, "LabelList attribute must match what was written")

async def fill_and_validate_group_key_sets(self,
fabrics: int,
clients: List[Any],
keys_per_fabric: int) -> List[List[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]]:
# Step 12: Write indicated_max_group_keys_per_fabric group keys to all fabrics.
group_keys: List[List[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]] = [[]]*fabrics
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]

# Write, skip the IPK key set.
for group_key_cluster_idx in range(1, keys_per_fabric):
group_key_list_idx: int = group_key_cluster_idx - 1

logging.info("Step 12: Setting group key on fabric %d at index '%d'" % (fabric_idx+1, group_key_cluster_idx))
group_keys[fabric_idx].append(self.build_group_key(fabric_idx, group_key_cluster_idx, keys_per_fabric))
await client.SendCommand(self.dut_node_id, 0, Clusters.GroupKeyManagement.Commands.KeySetWrite(group_keys[fabric_idx][group_key_list_idx]))

# Step 12 verification: After all the key sets were written, read all the information back.
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]

# Read, skip the IPK key set.
for group_key_cluster_idx in range(1, keys_per_fabric):
group_key_list_idx: int = group_key_cluster_idx - 1

logging.info("Step 12: Reading back group key on fabric %d at index ''%d'" % (fabric_idx+1, group_key_cluster_idx))
key_set = await client.SendCommand(self.dut_node_id, 0,
Clusters.GroupKeyManagement.Commands.KeySetRead(
group_keys[fabric_idx][group_key_list_idx].groupKeySetID),
responseType=Clusters.GroupKeyManagement.Commands.KeySetReadResponse)
print(key_set)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].groupKeySetID,
key_set.groupKeySetID, "Received incorrect key set.")
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].groupKeySecurityPolicy,
key_set.groupKeySecurityPolicy)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime0, key_set.epochStartTime0)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime1, key_set.epochStartTime1)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime2, key_set.epochStartTime2)
asserts.assert_equal(chip.NullValue, key_set.epochKey0, "Unexpected key response from read.")
asserts.assert_equal(chip.NullValue, key_set.epochKey1, "Unexpected key response from read.")
asserts.assert_equal(chip.NullValue, key_set.epochKey2, "Unexpected key response from read.")

return group_keys

async def fill_and_validate_group_key_map(self,
fabrics: int,
clients: List[Any],
group_key_map: List[Dict[int, int]]) -> None:
# Step 13: Write and verify indicated_max_groups_per_fabric group/key mappings for all fabrics.
mapping_structs: List[List[Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct]] = [[]]*fabrics
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]

for group in group_key_map[fabric_idx]:
mapping_structs[fabric_idx].append(Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct(groupId=group,
groupKeySetID=group_key_map[group],
fabricIndex=fabric_idx))

logging.info("Step 13: Setting group key map on fabric %d" % (fabric_idx+1))
await client.WriteAttribute(self.dut_node_id, [(0, Clusters.GroupKeyManagement.Attributes.GroupKeyMap(mapping_structs[fabric_idx]))])

# Step 13 verification: After all the group key maps were written, read all the information back.
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]

logging.info("Step 13: Reading group key map on fabric %d" % (fabric_idx+1))
group_key_map_readback = await self.read_single_attribute(client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.GroupKeyMap)

found_entry: int = 0
for read_entry in group_key_map_readback:
if read_entry.fabricIndex != fabric_idx:
continue

written_entry = next(entry for entry in mapping_structs[fabric_idx] if entry.groupId == read_entry.groupId)
found_entry += 1
asserts.assert_equal(written_entry.groupId, read_entry.groupId)
asserts.assert_equal(written_entry.groupKeySetID, read_entry.groupKeySetID)
asserts.assert_equal(written_entry.fabricIndex, read_entry.fabricIndex)

asserts.assert_equal(found_entry, len(mapping_structs[fabric_idx]),
"GroupKeyMap does not match the length of written data.")

async def add_all_groups(self,
fabrics: int,
clients: List[Any],
group_key_map: List[Dict[int, int]],
group_endpoints: Dict[int, Any],
groups_per_fabric: int) -> List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]]:
# Step 14: Add indicated_max_groups_per_fabric to each fabric through the Groups clusters on supporting endpoints.
written_group_table_map: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]] = [[]]*fabrics
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]

base_groups_per_endpoint: int = math.floor(groups_per_fabric / len(group_endpoints))
groups_remainder: int = groups_per_fabric % len(group_endpoints)
fabric_group_index: int = 0

for endpoint_id in group_endpoints:
groups_to_add: int = base_groups_per_endpoint + groups_remainder
groups_remainder -= 1

feature_map: int = await self.read_single_attribute(client,
node_id=self.dut_node_id,
endpoint=endpoint_id,
attribute=Clusters.Groups.Attributes.FeatureMap)
name_featrure_bit: int = 0
name_supported: bool = (feature_map & (1 << name_featrure_bit)) != 0

# Write groups to cluster
for group_idx in range(fabric_group_index, fabric_group_index + groups_to_add):
group_name: str = self.random_string(16) if name_supported else ""
group_id: int = group_key_map[fabric_idx].keys()[group_idx]
command: Clusters.GroupKeyManagement.Commands.AddGroup = Clusters.GroupKeyManagement.Commands.AddGroup(
groupID=group_id, groupName=group_name)
written_group_table_map[fabric_idx][group_id] = Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct(groupID=group_id,
groupName=group_name,
fabricIndex=fabric_idx,
endpoints=[endpoint_id])
add_response: Clusters.Groups.Commands.AddGroupResponse = await client.SendCommand(self.dut_node_id, endpoint_id, command,
responseType=Clusters.Groups.Commands.AddGroupResponse)
asserts.assert_equal(StatusEnum.Success, add_response.status)
asserts.assert_equal(group_id, add_response.groupID)

# for endpoint_id in group_endpoints
fabric_group_index += groups_to_add

async def validate_group_table(self,
fabrics: int,
clients: List[Any],
group_table_written: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]]) -> None:
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]
group_table_read: List[Clusters.GroupKeyManagement.Attributes.FeatGroupTableureMap] = await self.read_single_attribute(
client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.FeatGroupTableureMap)

found_groups: int = 0
for read_entry in group_table_read:
if read_entry.fabricIndex != fabric_idx:
continue

found_groups += 1
asserts.assert_in(group_table_written[fabric_idx], read_entry.groupId, "Group missing from group map")
written_entry: Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct = group_table_written[
fabric_idx][read_entry.groupId]
asserts.assert_equal(written_entry.groupId, read_entry.groupId)
asserts.assert_equal(written_entry.endpoints, read_entry.endpoints)
asserts.assert_equal(written_entry.groupName, read_entry.groupName)
asserts.assert_equal(written_entry.fabricIndex, read_entry.fabricIndex)

asserts.assert_equal(found_groups, len(group_table_written[fabric_idx]),
"Found group count does not match written value.")

def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric):
acl = []

Expand Down Expand Up @@ -462,6 +670,25 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric):

return acl

def build_group_key(self, fabric_index: int, group_key_index: int, keys_per_fabric: int) -> Clusters.GroupKeyManagement.Structs.GroupKeySetStruct:
asserts.assert_not_equal(group_key_index, 0, "TH Internal Error: IPK key set index (0) should not be re-generated.")

# groupKeySetID is definted as uint16 in the Matter specification.
# To easily test that the stored values are unique, unique values are created accross all fabrics.
# However, it is only required that values be unique within a fabric according to the specifiction.
# If a device ever provides over 65535 total key sets, then this will need to be updated.
set_id: int = fabric_index*keys_per_fabric + group_key_index
asserts.assert_less_equal(
set_id, 0xFFFF, "Invalid Key Set ID. This may be a limitation of the test harness, not the device under test.")
return Clusters.GroupKeyManagement.Structs.GroupKeySetStruct(groupKeySetID=set_id,
groupKeySecurityPolicy=Clusters.GroupKeyManagement.Enums.GroupKeySecurityPolicy.kTrustFirst,
epochKey0=self.random_string(16).encode(),
epochStartTime0=(set_id * 4),
epochKey1=self.random_string(16).encode(),
epochStartTime1=(set_id * 4 + 1),
epochKey2=self.random_string(16).encode(),
epochStartTime2=(set_id * 4 + 2))


if __name__ == "__main__":
default_matter_test_main(maximize_cert_chains=True, controller_cat_tags=[0x0001_0001])

0 comments on commit c2089bd

Please sign in to comment.