Skip to content

Commit 2227845

Browse files
authored
Merge 75e368c into a1bafe1
2 parents a1bafe1 + 75e368c commit 2227845

File tree

1 file changed

+255
-0
lines changed

1 file changed

+255
-0
lines changed

src/python_testing/TC_CADMIN_1_3.py

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
#
2+
# Copyright (c) 2024 Project CHIP Authors
3+
# All rights reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
# === BEGIN CI TEST ARGUMENTS ===
18+
# test-runner-runs:
19+
# run1:
20+
# app: ${ALL_CLUSTERS_APP}
21+
# factoryreset: true
22+
# quiet: true
23+
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
24+
# script-args: >
25+
# --storage-path admin_storage.json
26+
# --commissioning-method on-network
27+
# --discriminator 1234
28+
# --passcode 20202021
29+
# --trace-to json:${TRACE_TEST_JSON}.json
30+
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
31+
# === END CI TEST ARGUMENTS ===
32+
33+
import logging
34+
import random
35+
from time import sleep
36+
37+
import chip.clusters as Clusters
38+
from chip import ChipDeviceCtrl
39+
from chip.ChipDeviceCtrl import CommissioningParameters
40+
from chip.exceptions import ChipStackError
41+
from chip.interaction_model import Status
42+
from chip.native import PyChipError
43+
from chip.tlv import TLVReader
44+
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
45+
from mdns_discovery import mdns_discovery
46+
from mobly import asserts
47+
48+
opcreds = Clusters.OperationalCredentials
49+
nonce = random.randbytes(32)
50+
51+
52+
class TC_CADMIN_1_3(MatterBaseTest):
53+
async def CommissionAttempt(
54+
self, setupPinCode: int, thnum: int, th: str, fail: bool):
55+
56+
logging.info(f"-----------------Commissioning with TH_CR{str(thnum)}-------------------------")
57+
if fail:
58+
ctx = asserts.assert_raises(ChipStackError)
59+
self.print_step(0, ctx)
60+
with ctx:
61+
await th.CommissionOnNetwork(
62+
nodeId=self.dut_node_id, setupPinCode=setupPinCode,
63+
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
64+
errcode = PyChipError.from_code(ctx.exception.err)
65+
logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(
66+
errcode.is_success, errcode))
67+
asserts.assert_false(errcode.is_success, 'Commissioning complete did not error as expected')
68+
asserts.assert_true(errcode.sdk_code == 0x0000000B,
69+
'Unexpected error code returned from CommissioningComplete')
70+
71+
elif not fail:
72+
await th.CommissionOnNetwork(
73+
nodeId=self.dut_node_id, setupPinCode=setupPinCode,
74+
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
75+
76+
async def get_fabrics(self, th: ChipDeviceCtrl) -> int:
77+
OC_cluster = Clusters.OperationalCredentials
78+
fabric_info = await self.read_single_attribute_check_success(dev_ctrl=th, fabric_filtered=True, cluster=OC_cluster, attribute=OC_cluster.Attributes.Fabrics)
79+
return fabric_info
80+
81+
async def get_rcac_decoded(self, th: str) -> int:
82+
csrResponse = await self.send_single_cmd(dev_ctrl=th, node_id=self.dut_node_id, cmd=opcreds.Commands.CSRRequest(CSRNonce=nonce, isForUpdateNOC=False))
83+
TH_certs_real = await th.IssueNOCChain(csrResponse, self.dut_node_id)
84+
th_rcac_decoded = TLVReader(TH_certs_real.rcacBytes).get()["Any"]
85+
return th_rcac_decoded
86+
87+
async def get_txt_record(self):
88+
discovery = mdns_discovery.MdnsDiscovery(verbose_logging=True)
89+
comm_service = await discovery.get_commissionable_service(
90+
discovery_timeout_sec=240,
91+
log_output=False,
92+
)
93+
return comm_service
94+
95+
async def get_window_status(self) -> int:
96+
AC_cluster = Clusters.AdministratorCommissioning
97+
window_status = await self.read_single_attribute_check_success(dev_ctrl=self.th2, fabric_filtered=True, cluster=AC_cluster, attribute=AC_cluster.Attributes.WindowStatus)
98+
return window_status
99+
100+
async def OpenCommissioningWindow(self, th: ChipDeviceCtrl, duration: int) -> CommissioningParameters:
101+
self.discriminator = random.randint(0, 4095)
102+
try:
103+
params = await th.OpenCommissioningWindow(
104+
nodeid=self.dut_node_id, timeout=duration, iteration=10000, discriminator=self.discriminator, option=1)
105+
return params
106+
107+
except Exception as e:
108+
logging.exception('Error running OpenCommissioningWindow %s', e)
109+
if str(Clusters.AdministratorCommissioning.Enums.StatusCode.kBusy) in e.msg:
110+
asserts.assert_true(False, 'Failed to open commissioning window')
111+
else:
112+
asserts.assert_true(False, 'Failed to verify')
113+
114+
async def write_nl_attr(self, th: str, attr_val: object):
115+
result = await th.WriteAttribute(self.dut_node_id, [(0, attr_val)])
116+
asserts.assert_equal(result[0].Status, Status.Success, f"{th} node label write failed")
117+
118+
async def read_nl_attr(self, th: str, attr_val: object):
119+
try:
120+
await th.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, attr_val)])
121+
except Exception as e:
122+
asserts.assert_equal(e.err, "Received error message from read attribute attempt")
123+
124+
def pics_TC_CADMIN_1_3(self) -> list[str]:
125+
return ["CADMIN.S"]
126+
127+
def steps_TC_CADMIN_1_3(self) -> list[TestStep]:
128+
return [
129+
TestStep(1, "TH_CR1 starts a commissioning process with DUT_CE", is_commissioning=True),
130+
TestStep(2, "TH_CR1 reads the BasicCommissioningInfo attribute from the General Commissioning cluster and saves the MaxCumulativeFailsafeSeconds field as max_window_duration."),
131+
TestStep("3a", "TH_CR1 opens a commissioning window on DUT_CE using a commissioning timeout of max_window_duration using ECM",
132+
"DUT_CE opens its Commissioning window to allow a second commissioning."),
133+
TestStep("3b", "DNS-SD records shows DUT_CE advertising", "Verify that the DNS-SD advertisement shows CM=2"),
134+
TestStep("3c", "TH_CR1 writes and reads the Basic Information Cluster’s NodeLabel mandatory attribute of DUT_CE",
135+
"Verify DUT_CE responds to both write/read with a success"),
136+
TestStep(4, "TH creates a controller (TH_CR2) on a new fabric and commissions DUT_CE using that controller. TH_CR2 should commission the device using a different NodeID than TH_CR1.",
137+
"Commissioning is successful"),
138+
TestStep(5, "TH_CR1 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read",
139+
"Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device."),
140+
TestStep(6, "TH_CR2 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read",
141+
"Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device."),
142+
TestStep(7, "TH_CR1 writes and reads the Basic Information Cluster’s NodeLabel mandatory attribute of DUT_CE",
143+
"Verify DUT_CE responds to both write/read with a success"),
144+
TestStep(8, "TH_CR2 reads, writes and then reads the Basic Information Cluster’s NodeLabel mandatory attribute of DUT_CE",
145+
"Verify the initial read reflect the value written in the above step. Verify DUT_CE responds to both write/read with a success"),
146+
TestStep(9, "TH_CR2 opens a commissioning window on DUT_CE for 180 seconds using ECM"),
147+
TestStep(10, "Wait for the commissioning window in step 9 to timeout"),
148+
TestStep(11, "TH_CR2 reads the window status to verify the DUT_CE window is closed",
149+
"DUT_CE windows status shows the window is closed"),
150+
TestStep(12, "TH_CR2 opens a commissioning window on DUT_CE using ECM",
151+
"DUT_CE opens its Commissioning window to allow a new commissioning"),
152+
TestStep(13, "TH_CR1 starts a commissioning process with DUT_CE before the timeout from step 12",
153+
"Since DUT_CE was already commissioned by TH_CR1 in step 1, AddNOC fails with NOCResponse with StatusCode field set to FabricConflict (9)"),
154+
]
155+
156+
@async_test_body
157+
async def test_TC_CADMIN_1_3(self):
158+
self.step(1)
159+
160+
# Establishing TH1
161+
self.th1 = self.default_controller
162+
163+
self.step(2)
164+
GC_cluster = Clusters.GeneralCommissioning
165+
attribute = GC_cluster.Attributes.BasicCommissioningInfo
166+
duration = await self.read_single_attribute_check_success(endpoint=0, cluster=GC_cluster, attribute=attribute)
167+
self.max_window_duration = duration.maxCumulativeFailsafeSeconds
168+
169+
self.step("3a")
170+
params = await self.OpenCommissioningWindow(th=self.th1, duration=self.max_window_duration)
171+
setupPinCode = params.setupPinCode
172+
173+
self.step("3b")
174+
services = await self.get_txt_record()
175+
if services.txt_record['CM'] != "2":
176+
asserts.fail(f"Expected cm record value not found, instead value found was {str(services.txt_record['CM'])}")
177+
178+
self.step("3c")
179+
BI_cluster = Clusters.BasicInformation
180+
nl_attribute = BI_cluster.Attributes.NodeLabel
181+
await self.write_nl_attr(th=self.th1, attr_val=nl_attribute)
182+
await self.read_nl_attr(th=self.th1, attr_val=nl_attribute)
183+
184+
self.step(4)
185+
# Establishing TH2
186+
th2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
187+
th2_fabric_admin = th2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.th1.fabricId + 1)
188+
self.th2 = th2_fabric_admin.NewController(nodeId=2, useTestCommissioner=True)
189+
await self.CommissionAttempt(setupPinCode, th=self.th2, fail=False, thnum=2)
190+
191+
self.step(5)
192+
# TH_CR1 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read
193+
th1_fabric_info = await self.get_fabrics(th=self.th1)
194+
195+
# Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device.
196+
await self.send_single_cmd(dev_ctrl=self.th1, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10))
197+
th1_rcac_decoded = await self.get_rcac_decoded(th=self.th1)
198+
if th1_fabric_info[0].rootPublicKey != th1_rcac_decoded[9]:
199+
asserts.fail("public keys from fabric and certs for TH1 are not the same")
200+
if th1_fabric_info[0].nodeID != self.dut_node_id:
201+
asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning")
202+
203+
# Expiring the failsafe timer in an attempt to clean up before TH2 attempt.
204+
await self.th1.SendCommand(self.dut_node_id, 0, Clusters.GeneralCommissioning.Commands.ArmFailSafe(0))
205+
206+
self.step(6)
207+
# TH_CR2 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read
208+
th2_fabric_info = await self.get_fabrics(th=self.th2)
209+
210+
# Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device.
211+
await self.send_single_cmd(dev_ctrl=self.th2, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(self.max_window_duration))
212+
th2_rcac_decoded = await self.get_rcac_decoded(th=self.th2)
213+
if th2_fabric_info[0].rootPublicKey != th2_rcac_decoded[9]:
214+
asserts.fail("public keys from fabric and certs for TH2 are not the same")
215+
if th2_fabric_info[0].nodeID != self.dut_node_id:
216+
asserts.fail("DUT node ID from fabric does not match DUT node ID for TH2 during commissioning")
217+
218+
await self.th2.SendCommand(self.dut_node_id, 0, Clusters.GeneralCommissioning.Commands.ArmFailSafe(0))
219+
220+
self.step(7)
221+
# TH_CR1 writes and reads the Basic Information Cluster’s NodeLabel mandatory attribute of DUT_CE
222+
await self.write_nl_attr(th=self.th1, attr_val=nl_attribute)
223+
await self.read_nl_attr(th=self.th1, attr_val=nl_attribute)
224+
225+
self.step(8)
226+
# TH_CR2 writes and reads the Basic Information Cluster’s NodeLabel mandatory attribute of DUT_CE
227+
await self.write_nl_attr(th=self.th2, attr_val=nl_attribute)
228+
await self.read_nl_attr(th=self.th2, attr_val=nl_attribute)
229+
sleep(1)
230+
231+
self.step(9)
232+
# TH_CR2 opens a commissioning window on DUT_CE for 180 seconds using ECM
233+
await self.OpenCommissioningWindow(th=self.th2, duration=180)
234+
235+
self.step(10)
236+
# Wait for the commissioning window in step 9 to timeout
237+
sleep(180)
238+
239+
self.step(11)
240+
# TH_CR2 reads the window status to verify the DUT_CE window is closed
241+
window_status = await self.get_window_status()
242+
# window_status = await self.th2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AdministratorCommissioning.Attributes.WindowStatus)])
243+
AC_cluster = Clusters.AdministratorCommissioning
244+
self.print_step(0, dir(AC_cluster.Enums))
245+
self.print_step(1, window_status)
246+
247+
self.step(12)
248+
# TH_CR2 opens a commissioning window on DUT_CE using ECM
249+
250+
self.step(13)
251+
# TH_CR1 starts a commissioning process with DUT_CE before the timeout from step 12
252+
253+
254+
if __name__ == "__main__":
255+
default_matter_test_main()

0 commit comments

Comments
 (0)