diff --git a/.gitignore b/.gitignore index faf75845..15f03246 100644 --- a/.gitignore +++ b/.gitignore @@ -70,4 +70,8 @@ ENV/ .*.swp # Visual Studio Code -.vscode \ No newline at end of file +.vscode + + +# macOS +.DS_Store \ No newline at end of file diff --git a/bellows/ezsp/v7/commands.py b/bellows/ezsp/v7/commands.py index 3da7314b..4c0d89bd 100644 --- a/bellows/ezsp/v7/commands.py +++ b/bellows/ezsp/v7/commands.py @@ -137,7 +137,7 @@ "getChildData": ( 0x4A, (t.uint8_t,), - (t.EmberStatus, t.EmberNodeId, t.EmberEUI64, t.EmberNodeType), + (t.EmberStatus, t.EmberChildData), ), "getSourceRouteTableTotalSize": (0xC3, (), (t.uint8_t,)), "getSourceRouteTableFilledSize": (0xC2, (), (t.uint8_t,)), diff --git a/bellows/ezsp/v7/types/struct.py b/bellows/ezsp/v7/types/struct.py index 7a454846..be0259dc 100644 --- a/bellows/ezsp/v7/types/struct.py +++ b/bellows/ezsp/v7/types/struct.py @@ -189,3 +189,25 @@ class EmberPerDeviceDutyCycle(EzspStruct): nodeId: named.EmberNodeId # Amount of overall duty cycle consumed (up to suspend limit). dutyCycleConsumed: named.EmberDutyCycleHectoPct + + +class EmberChildData(EzspStruct): + """A structure containing a child node's data.""" + + # The EUI64 of the child + eui64: named.EmberEUI64 + # The node type of the child + type: named.EmberNodeType + # The short address of the child + id: named.EmberNodeId + # The phy of the child + phy: basic.uint8_t + # The power of the child + power: basic.uint8_t + # The timeout of the child + timeout: basic.uint8_t + + # The GPD's EUI64. + # gpdIeeeAddress: named.EmberEUI64 + # The GPD's source ID. + # sourceId: basic.uint32_t diff --git a/bellows/ezsp/v8/commands.py b/bellows/ezsp/v8/commands.py index 0c6a5925..ea2b07cc 100644 --- a/bellows/ezsp/v8/commands.py +++ b/bellows/ezsp/v8/commands.py @@ -134,7 +134,7 @@ "getChildData": ( 0x004A, (t.uint8_t,), - (t.EmberStatus, t.EmberNodeId, t.EmberEUI64, t.EmberNodeType), + (t.EmberStatus, t.EmberChildData), ), "getSourceRouteTableTotalSize": (0x00C3, (), (t.uint8_t,)), "getSourceRouteTableFilledSize": (0x00C2, (), (t.uint8_t,)), diff --git a/bellows/ezsp/v8/types/struct.py b/bellows/ezsp/v8/types/struct.py index 8a0fd328..f1a56628 100644 --- a/bellows/ezsp/v8/types/struct.py +++ b/bellows/ezsp/v8/types/struct.py @@ -202,3 +202,25 @@ class EmberTransientKeyData(EzspStruct): # The number of seconds remaining before the key is automatically timed out of the # transient key table. remainingTimeSeconds: basic.uint16_t + + +class EmberChildData(EzspStruct): + """A structure containing a child node's data.""" + + # The EUI64 of the child + eui64: named.EmberEUI64 + # The node type of the child + type: named.EmberNodeType + # The short address of the child + id: named.EmberNodeId + # The phy of the child + phy: basic.uint8_t + # The power of the child + power: basic.uint8_t + # The timeout of the child + timeout: basic.uint8_t + + # The GPD's EUI64. + # gpdIeeeAddress: named.EmberEUI64 + # The GPD's source ID. + # sourceId: basic.uint32_t diff --git a/bellows/types/named.py b/bellows/types/named.py index 65cabbfc..7fc1792a 100644 --- a/bellows/types/named.py +++ b/bellows/types/named.py @@ -610,6 +610,9 @@ class EmberStatus(basic.enum8): # An index was passed into the function that was larger than the valid # range. INDEX_OUT_OF_RANGE = 0xB1 + # The passed key data is not valid. A key of all zeros or all F's are reserved + # values and cannot be used. + KEY_INVALID = 0xB2 # There are no empty entries left in the table. TABLE_FULL = 0xB4 # The requested table entry has been erased and contains no valid data. @@ -970,6 +973,8 @@ class EmberCurrentSecurityBitmask(basic.bitmap16): GLOBAL_LINK_KEY = 0x0004 # This denotes that the node has a Trust Center Link Key. HAVE_TRUST_CENTER_LINK_KEY = 0x0010 + # TODO: 0x0020 is unknown + # TODO: 0x0040 is unknown # This denotes that the Trust Center is using a Hashed Link Key. TRUST_CENTER_USES_HASHED_LINK_KEY = 0x0084 @@ -994,6 +999,12 @@ class EmberKeyStructBitmask(basic.bitmap16): # hears a device announce from the partner indicating it is not # an 'RX on when idle' device. KEY_PARTNER_IS_SLEEPY = 0x0020 + # This indicates that the transient key which is being added is unconfirmed. This + # bit is set when we add a transient key while the EmberTcLinkKeyRequestPolicy is + # EMBER_ALLOW_TC_LINK_KEY_REQUEST_AND_GENERATE_NEW_KEY + UNCONFIRMED_TRANSIENT_KEY = 0x0040 + + # TODO: 0x0080 is unknown class EmberKeyStatus(basic.enum8): @@ -1757,3 +1768,22 @@ class sl_Status(basic.enum32): SL_STATUS_WIFI_RETRY_EXCEEDED = 0x0B1F # The request failed because the MSDU life time was exceeded SL_STATUS_WIFI_TX_LIFETIME_EXCEEDED = 0x0B20 + + +class EmberDistinguishedNodeId(basic.enum16): + """A distinguished network ID that will never be assigned to any node""" + + # This value is used when getting the remote node ID from the address or binding + # tables. It indicates that the address or binding table entry is currently in use + # and network address discovery is underway. + DISCOVERY_ACTIVE = 0xFFFC + + # This value is used when getting the remote node ID from the address or binding + # tables. It indicates that the address or binding table entry is currently in use + # but the node ID corresponding to the EUI64 in the table is currently unknown. + UNKNOWN = 0xFFFD + + # This value is used when setting or getting the remote node ID in the address table + # or getting the remote node ID from the binding table. It indicates that the + # address or binding table entry is not in use. + TABLE_ENTRY_UNUSED = 0xFFFF diff --git a/bellows/types/struct.py b/bellows/types/struct.py index 37b17691..2c1b1387 100644 --- a/bellows/types/struct.py +++ b/bellows/types/struct.py @@ -2,8 +2,6 @@ import inspect import typing -import zigpy.state as app_state - from . import basic, named NoneType = type(None) @@ -282,19 +280,6 @@ class EmberNetworkParameters(EzspStruct): # method. channels: named.Channels - @property - def zigpy_network_information(self) -> app_state.NetworkInformation: - """Convert to NetworkInformation.""" - r = app_state.NetworkInformation( - self.extendedPanId, - app_state.t.PanId(self.panId), - self.nwkUpdateId, - app_state.t.NWK(self.nwkManagerId), - self.radioChannel, - channel_mask=self.channels, - ) - return r - class EmberZigbeeNetwork(EzspStruct): # The parameters of a ZigBee network. diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 3e5f119d..384fcffe 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -9,12 +9,14 @@ import zigpy.application import zigpy.config import zigpy.device -from zigpy.quirks import CustomDevice, CustomEndpoint -import zigpy.state as app_state -from zigpy.types import Addressing, BroadcastAddress +import zigpy.endpoint +from zigpy.exceptions import FormationFailure, NetworkNotFormed +import zigpy.state +import zigpy.types import zigpy.util import zigpy.zdo.types as zdo_t +import bellows from bellows.config import ( CONF_PARAM_SRC_RTG, CONF_PARAM_UNK_DEV, @@ -26,7 +28,7 @@ from bellows.ezsp.v8.types.named import EmberDeviceUpdate import bellows.multicast import bellows.types as t -import bellows.zigbee.util +import bellows.zigbee.util as util APS_ACK_TIMEOUT = 120 COUNTER_EZSP_BUFFERS = "EZSP_FREE_BUFFERS" @@ -97,25 +99,17 @@ def multicast(self): """Return EZSP MulticastController.""" return self._multicast - async def add_endpoint( - self, - endpoint=1, - profile_id=zigpy.profiles.zha.PROFILE_ID, - device_id=0xBEEF, - app_flags=0x00, - input_clusters=[], - output_clusters=[], - ): + async def add_endpoint(self, descriptor: zdo_t.SimpleDescriptor) -> None: """Add endpoint.""" res = await self._ezsp.addEndpoint( - endpoint, - profile_id, - device_id, - app_flags, - len(input_clusters), - len(output_clusters), - input_clusters, - output_clusters, + descriptor.endpoint, + descriptor.profile, + descriptor.device_type, + descriptor.device_version, + len(descriptor.input_clusters), + len(descriptor.output_clusters), + descriptor.input_clusters, + descriptor.output_clusters, ) LOGGER.debug("Ezsp adding endpoint: %s", res) @@ -127,8 +121,16 @@ async def cleanup_tc_link_key(self, ieee: t.EmberEUI64) -> None: status = await self._ezsp.eraseKeyTableEntry(index) LOGGER.debug("Cleaned up TC link key for %s device: %s", ieee, status) - async def startup(self, auto_form=False): - """Perform a complete application startup""" + async def _get_board_info(self) -> tuple[str, str, str] | tuple[None, None, None]: + """Get the board info, handling errors when `getMfgToken` is not supported.""" + try: + return await self._ezsp.get_board_info() + except EzspError as exc: + LOGGER.info("EZSP Radio does not support getMfgToken command: %r", exc) + + return None, None, None + + async def connect(self): self._ezsp = await bellows.ezsp.EZSP.initialize(self.config) ezsp = self._ezsp @@ -141,47 +143,37 @@ async def startup(self, auto_form=False): self._in_flight_msg = asyncio.Semaphore(count) LOGGER.debug("APS_UNICAST_MESSAGE_COUNT is set to %s", count) - await self.add_endpoint( - output_clusters=[zigpy.zcl.clusters.security.IasZone.cluster_id] - ) + await self.register_endpoints() - try: - brd_manuf, brd_name, version = await self._ezsp.get_board_info() - LOGGER.info("EZSP Radio manufacturer: %s", brd_manuf) - LOGGER.info("EZSP Radio board name: %s", brd_name) - LOGGER.info("EmberZNet version: %s", version) - except EzspError as exc: - LOGGER.info("EZSP Radio does not support getMfgToken command: %s", str(exc)) + brd_manuf, brd_name, version = await self._get_board_info() + LOGGER.info("EZSP Radio manufacturer: %s", brd_manuf) + LOGGER.info("EZSP Radio board name: %s", brd_name) + LOGGER.info("EmberZNet version: %s", version) - v = await ezsp.networkInit() - if v[0] != t.EmberStatus.SUCCESS: - if not auto_form: - raise ControllerError("Could not initialize network") - await self.form_network() + async def _ensure_network_running(self) -> bool: + """ + Ensures the network is currently running and returns whether or not the network + was started. + """ + (state,) = await self._ezsp.networkState() - status, node_type, nwk_params = await ezsp.getNetworkParameters() - assert status == t.EmberStatus.SUCCESS # TODO: Better check - if node_type != t.EmberNodeType.COORDINATOR: - if not auto_form: - raise ControllerError("Network not configured as coordinator") + if state == self._ezsp.types.EmberNetworkStatus.JOINED_NETWORK: + return False - LOGGER.info( - "Leaving current network as %s and forming new network", node_type.name - ) - (status,) = await self._ezsp.leaveNetwork() - assert status == t.EmberStatus.NETWORK_DOWN - await self.form_network() - status, node_type, nwk_params = await ezsp.getNetworkParameters() - assert status == t.EmberStatus.SUCCESS + (init_status,) = await self._ezsp.networkInit() + if init_status != t.EmberStatus.SUCCESS: + raise NetworkNotFormed(f"Failed to init network: {init_status!r}") + + return True + + async def start_network(self): + ezsp = self._ezsp + + await self._ensure_network_running() - LOGGER.info("Node type: %s, Network parameters: %s", node_type, nwk_params) await ezsp.update_policies(self.config) - (nwk,) = await ezsp.getNodeId() - (ieee,) = await ezsp.getEui64() + await self.load_network_info(load_devices=False) - node_info = app_state.NodeInfo(nwk, ieee, node_type.zdo_logical_type) - self.state.node_information = node_info - self.state.network_information = nwk_params.zigpy_network_information for cnt_group in self.state.counters: cnt_group.reset() @@ -193,54 +185,257 @@ async def startup(self, auto_form=False): self.controller_event.set() self._watchdog_task = asyncio.create_task(self._watchdog()) - self.handle_join(self.nwk, self.ieee, 0) - LOGGER.debug("EZSP nwk=0x%04x, IEEE=%s", self._nwk, str(self._ieee)) + ezsp_device = EZSPCoordinator( + application=self, + ieee=self.state.node_info.ieee, + nwk=self.state.node_info.nwk, + ) + self.devices[self.state.node_info.ieee] = ezsp_device - await self.multicast.startup(self.get_device(self.ieee)) + # The coordinator device does not respond to attribute reads + ezsp_device.endpoints[1] = EZSPCoordinator.EZSPEndpoint(ezsp_device, 1) + ezsp_device.model = ezsp_device.endpoints[1].model + ezsp_device.manufacturer = ezsp_device.endpoints[1].manufacturer + await ezsp_device.schedule_initialize() - async def shutdown(self): - """Shutdown and cleanup ControllerApplication.""" - LOGGER.info("Shutting down ControllerApplication") - self.controller_event.clear() - if self._watchdog_task and not self._watchdog_task.done(): - LOGGER.debug("Cancelling watchdog") - self._watchdog_task.cancel() - if self._reset_task and not self._reset_task.done(): - self._reset_task.cancel() - if self._ezsp is not None: - self._ezsp.close() + await self.multicast.startup(ezsp_device) + + async def load_network_info(self, *, load_devices=False) -> None: + ezsp = self._ezsp + + await self._ensure_network_running() + + status, node_type, nwk_params = await ezsp.getNetworkParameters() + assert status == t.EmberStatus.SUCCESS + + if node_type != t.EmberNodeType.COORDINATOR: + raise NetworkNotFormed("Device not configured as coordinator") + + (nwk,) = await ezsp.getNodeId() + (ieee,) = await ezsp.getEui64() + + self.state.node_info = zigpy.state.NodeInfo( + nwk=zigpy.types.NWK(nwk), + ieee=zigpy.types.EUI64(ieee), + logical_type=node_type.zdo_logical_type, + ) + + (status, security_level) = await ezsp.getConfigurationValue( + ezsp.types.EzspConfigId.CONFIG_SECURITY_LEVEL + ) + assert status == t.EmberStatus.SUCCESS + security_level = zigpy.types.uint8_t(security_level) + + (status, network_key) = await ezsp.getKey( + ezsp.types.EmberKeyType.CURRENT_NETWORK_KEY + ) + assert status == t.EmberStatus.SUCCESS + + (status, ezsp_tc_link_key) = await ezsp.getKey( + ezsp.types.EmberKeyType.TRUST_CENTER_LINK_KEY + ) + assert status == t.EmberStatus.SUCCESS + tc_link_key = util.ezsp_key_to_zigpy_key(ezsp_tc_link_key, ezsp) + + (status, state) = await ezsp.getCurrentSecurityState() + assert status == t.EmberStatus.SUCCESS + + stack_specific = {} + + if ( + ezsp.types.EmberCurrentSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY + in state.bitmask + ): + stack_specific["ezsp"] = {"hashed_tclk": tc_link_key.key.serialize().hex()} + tc_link_key.key = zigpy.types.KeyData(b"ZigBeeAlliance09") + + # The TCLK IEEE address is returned as `FF:FF:FF:FF:FF:FF:FF:FF` + if self.state.node_info.logical_type == zdo_t.LogicalType.Coordinator: + tc_link_key.partner_ieee = self.state.node_info.ieee + + brd_manuf, brd_name, version = await self._get_board_info() + + self.state.network_info = zigpy.state.NetworkInfo( + source=f"bellows@{bellows.__version__}", + extended_pan_id=zigpy.types.ExtendedPanId(nwk_params.extendedPanId), + pan_id=zigpy.types.PanId(nwk_params.panId), + nwk_update_id=zigpy.types.uint8_t(nwk_params.nwkUpdateId), + nwk_manager_id=zigpy.types.NWK(nwk_params.nwkManagerId), + channel=zigpy.types.uint8_t(nwk_params.radioChannel), + channel_mask=zigpy.types.Channels(nwk_params.channels), + security_level=zigpy.types.uint8_t(security_level), + network_key=util.ezsp_key_to_zigpy_key(network_key, ezsp), + tc_link_key=tc_link_key, + key_table=[], + children=[], + nwk_addresses={}, + stack_specific=stack_specific, + metadata={ + "ezsp": { + "manufacturer": brd_manuf, + "board": brd_name, + "version": version, + "stack_version": ezsp.ezsp_version, + } + }, + ) - async def form_network(self): - nwk = self.config[zigpy.config.CONF_NWK] + if not load_devices: + return + + for idx in range(0, 192): + (status, key) = await ezsp.getKeyTableEntry(idx) + + if status == t.EmberStatus.INDEX_OUT_OF_RANGE: + break + elif status == t.EmberStatus.TABLE_ENTRY_ERASED: + continue + + assert status == t.EmberStatus.SUCCESS + + self.state.network_info.key_table.append( + util.ezsp_key_to_zigpy_key(key, ezsp) + ) + + for idx in range(0, 255 + 1): + (status, *rsp) = await ezsp.getChildData(idx) + + if status == t.EmberStatus.NOT_JOINED: + continue + + if ezsp.ezsp_version >= 7: + nwk = rsp[0].id + eui64 = rsp[0].eui64 + node_type = rsp[0].type + else: + nwk, eui64, node_type = rsp + + self.state.network_info.children.append(eui64) + self.state.network_info.nwk_addresses[eui64] = nwk + + # v4 can crash when getAddressTableRemoteNodeId(32) is received + # Error code: undefined_0x8a + if ezsp.ezsp_version == 4: + return + + for idx in range(0, 255 + 1): + (nwk,) = await ezsp.getAddressTableRemoteNodeId(idx) + (eui64,) = await ezsp.getAddressTableRemoteEui64(idx) + + # Ignore invalid NWK entries + if nwk in t.EmberDistinguishedNodeId.__members__.values(): + continue + + self.state.network_info.nwk_addresses[ + zigpy.types.EUI64(eui64) + ] = zigpy.types.NWK(nwk) + + async def write_network_info( + self, *, network_info: zigpy.state.NetworkInfo, node_info: zigpy.state.NodeInfo + ) -> None: + ezsp = self._ezsp + + try: + (status,) = await ezsp.leaveNetwork() + if status != t.EmberStatus.NETWORK_DOWN: + raise FormationFailure("Couldn't leave network") + except bellows.exception.EzspError: + pass + + stack_specific = network_info.stack_specific.get("ezsp", {}) + (current_eui64,) = await ezsp.getEui64() + + if ( + node_info.ieee != zigpy.types.EUI64.UNKNOWN + and node_info.ieee != current_eui64 + ): + should_update_eui64 = stack_specific.get( + "i_understand_i_can_update_eui64_only_once_and_i_still_want_to_do_it" + ) - pan_id = nwk[zigpy.config.CONF_NWK_PAN_ID] - if pan_id is None: - pan_id = int.from_bytes(os.urandom(2), byteorder="little") + if should_update_eui64: + new_ncp_eui64 = t.EmberEUI64(node_info.ieee) + (status,) = await ezsp.setMfgToken( + t.EzspMfgTokenId.MFG_CUSTOM_EUI_64, new_ncp_eui64.serialize() + ) + assert status == t.EmberStatus.SUCCESS + else: + LOGGER.warning( + "Current node's IEEE address (%s) does not match the backup's (%s)", + current_eui64, + node_info.ieee, + ) - extended_pan_id = nwk[zigpy.config.CONF_NWK_EXTENDED_PAN_ID] - if extended_pan_id is None: - extended_pan_id = t.EmberEUI64([t.uint8_t(0)] * 8) + use_hashed_tclk = ezsp.ezsp_version > 4 - hashed_tclk = self._ezsp.ezsp_version > 4 - initial_security_state = bellows.zigbee.util.zha_security( - nwk, controller=True, hashed_tclk=hashed_tclk + if use_hashed_tclk and not stack_specific.get("hashed_tclk"): + # Generate a random default + network_info.stack_specific.setdefault("ezsp", {})[ + "hashed_tclk" + ] = os.urandom(16).hex() + + initial_security_state = util.zha_security( + network_info=network_info, + node_info=node_info, + use_hashed_tclk=use_hashed_tclk, ) - v = await self._ezsp.setInitialSecurityState(initial_security_state) - assert v[0] == t.EmberStatus.SUCCESS # TODO: Better check + (status,) = await ezsp.setInitialSecurityState(initial_security_state) + assert status == t.EmberStatus.SUCCESS + + # Clear the key table + (status,) = await ezsp.clearKeyTable() + assert status == t.EmberStatus.SUCCESS + + # Write APS link keys + for key in network_info.key_table: + ember_key = util.zigpy_key_to_ezsp_key(key, ezsp) + + # XXX: is there no way to set the outgoing frame counter or seq? + (status,) = await ezsp.addOrUpdateKeyTableEntry( + ember_key.partnerEUI64, True, ember_key.key + ) + if status != t.EmberStatus.SUCCESS: + LOGGER.warning("Couldn't add %s key: %s", key, status) + + if ezsp.ezsp_version > 4: + # Set NWK frame counter + (status,) = await ezsp.setValue( + ezsp.types.EzspValueId.VALUE_NWK_FRAME_COUNTER, + t.uint32_t(network_info.network_key.tx_counter).serialize(), + ) + assert status == t.EmberStatus.SUCCESS + + # Set APS frame counter + (status,) = await ezsp.setValue( + ezsp.types.EzspValueId.VALUE_APS_FRAME_COUNTER, + t.uint32_t(network_info.tc_link_key.tx_counter).serialize(), + ) + assert status == t.EmberStatus.SUCCESS + + # Set the network settings parameters = t.EmberNetworkParameters() - parameters.panId = t.EmberPanId(pan_id) - parameters.extendedPanId = extended_pan_id + parameters.panId = t.EmberPanId(network_info.pan_id) + parameters.extendedPanId = t.EmberEUI64(network_info.extended_pan_id) parameters.radioTxPower = t.uint8_t(8) - parameters.radioChannel = t.uint8_t(nwk[zigpy.config.CONF_NWK_CHANNEL]) + parameters.radioChannel = t.uint8_t(network_info.channel) parameters.joinMethod = t.EmberJoinMethod.USE_MAC_ASSOCIATION - parameters.nwkManagerId = t.EmberNodeId(0) - parameters.nwkUpdateId = t.uint8_t(nwk[zigpy.config.CONF_NWK_UPDATE_ID]) - parameters.channels = nwk[zigpy.config.CONF_NWK_CHANNELS] + parameters.nwkManagerId = t.EmberNodeId(network_info.nwk_manager_id) + parameters.nwkUpdateId = t.uint8_t(network_info.nwk_update_id) + parameters.channels = t.Channels(network_info.channel_mask) - await self._ezsp.formNetwork(parameters) - await self._ezsp.setValue( - self._ezsp.types.EzspValueId.VALUE_STACK_TOKEN_WRITING, 1 - ) + await ezsp.formNetwork(parameters) + await ezsp.setValue(ezsp.types.EzspValueId.VALUE_STACK_TOKEN_WRITING, 1) + + async def disconnect(self): + # TODO: how do you shut down the stack? + self.controller_event.clear() + if self._watchdog_task and not self._watchdog_task.done(): + LOGGER.debug("Cancelling watchdog") + self._watchdog_task.cancel() + if self._reset_task and not self._reset_task.done(): + self._reset_task.cancel() + if self._ezsp is not None: + self._ezsp.close() async def force_remove(self, dev): # This should probably be delivered to the parent device instead @@ -277,13 +472,17 @@ def _handle_frame( ) -> None: if message_type == t.EmberIncomingMessageType.INCOMING_BROADCAST: self.state.counters[COUNTERS_CTRL][COUNTER_RX_BCAST].increment() - dst_addressing = Addressing.nwk(0xFFFE, aps_frame.destinationEndpoint) + dst_addressing = zigpy.types.Addressing.nwk( + 0xFFFE, aps_frame.destinationEndpoint + ) elif message_type == t.EmberIncomingMessageType.INCOMING_MULTICAST: self.state.counters[COUNTERS_CTRL][COUNTER_RX_MCAST].increment() - dst_addressing = Addressing.group(aps_frame.groupId) + dst_addressing = zigpy.types.Addressing.group(aps_frame.groupId) elif message_type == t.EmberIncomingMessageType.INCOMING_UNICAST: self.state.counters[COUNTERS_CTRL][COUNTER_RX_UNICAST].increment() - dst_addressing = Addressing.nwk(self.nwk, aps_frame.destinationEndpoint) + dst_addressing = zigpy.types.Addressing.nwk( + self.state.node_info.nwk, aps_frame.destinationEndpoint + ) else: dst_addressing = None @@ -653,7 +852,7 @@ async def broadcast( radius, sequence, data, - broadcast_address=BroadcastAddress.RX_ON_WHEN_IDLE, + broadcast_address=zigpy.types.BroadcastAddress.RX_ON_WHEN_IDLE, ): """Submit and send data out as an unicast transmission. @@ -796,10 +995,10 @@ def handle_route_error(self, status: t.EmberStatus, nwk: t.EmberNodeId) -> None: dev.relays = None -class EZSPCoordinator(CustomDevice): +class EZSPCoordinator(zigpy.device.Device): """Zigpy Device representing Coordinator.""" - class EZSPEndpoint(CustomEndpoint): + class EZSPEndpoint(zigpy.endpoint.Endpoint): @property def manufacturer(self) -> str: """Manufacturer.""" @@ -836,20 +1035,3 @@ async def remove_from_group(self, grp_id: int) -> t.EmberStatus: app.groups[grp_id].remove_member(self) return status - - signature = { - "endpoints": { - 1: { - "profile_id": 0x0104, - "device_type": 0xBEEF, - "input_clusters": [], - "output_clusters": [zigpy.zcl.clusters.security.IasZone.cluster_id], - } - } - } - - replacement = { - "endpoints": {1: (EZSPEndpoint, {})}, - "manufacturer": "Silicon Labs", - "model": "EZSP", - } diff --git a/bellows/zigbee/util.py b/bellows/zigbee/util.py index ac0b9c41..1c615418 100644 --- a/bellows/zigbee/util.py +++ b/bellows/zigbee/util.py @@ -1,39 +1,96 @@ -import os -from typing import Any, Dict +import logging -import zigpy.config +import zigpy.state +import zigpy.types as zigpy_t +import zigpy.zdo.types as zdo_t import bellows.types as t +LOGGER = logging.getLogger(__name__) -def zha_security( - config: Dict[str, Any], controller: bool = False, hashed_tclk: bool = True -) -> None: +def zha_security( + *, + network_info: zigpy.state.NetworkInfo, + node_info: zigpy.state.NodeInfo, + use_hashed_tclk: bool, +) -> t.EmberInitialSecurityState: + """Construct an `EmberInitialSecurityState` out of zigpy network state.""" isc = t.EmberInitialSecurityState() isc.bitmask = ( t.EmberInitialSecurityBitmask.HAVE_PRECONFIGURED_KEY | t.EmberInitialSecurityBitmask.REQUIRE_ENCRYPTED_KEY + | t.EmberInitialSecurityBitmask.TRUST_CENTER_GLOBAL_LINK_KEY + | t.EmberInitialSecurityBitmask.HAVE_NETWORK_KEY ) - isc.preconfiguredKey = t.EmberKeyData(config[zigpy.config.CONF_NWK_TC_LINK_KEY]) - nwk_key = config[zigpy.config.CONF_NWK_KEY] - if nwk_key is None: - nwk_key = os.urandom(16) - isc.networkKey = t.EmberKeyData(nwk_key) - isc.networkKeySequenceNumber = t.uint8_t(config[zigpy.config.CONF_NWK_KEY_SEQ]) - tc_addr = config[zigpy.config.CONF_NWK_TC_ADDRESS] - if tc_addr is None: - tc_addr = [0x00] * 8 - isc.preconfiguredTrustCenterEui64 = t.EmberEUI64(tc_addr) - - if controller: - isc.bitmask |= ( - t.EmberInitialSecurityBitmask.TRUST_CENTER_GLOBAL_LINK_KEY - | t.EmberInitialSecurityBitmask.HAVE_NETWORK_KEY + isc.networkKey = t.EmberKeyData(network_info.network_key.key) + isc.networkKeySequenceNumber = t.uint8_t(network_info.network_key.seq) + + if ( + node_info.logical_type != zdo_t.LogicalType.Coordinator + and network_info.tc_link_key.partner_ieee != zigpy_t.EUI64.UNKNOWN + ): + isc.bitmask |= t.EmberInitialSecurityBitmask.HAVE_TRUST_CENTER_EUI64 + isc.preconfiguredTrustCenterEui64 = t.EmberEUI64( + network_info.tc_link_key.partner_ieee + ) + else: + isc.preconfiguredTrustCenterEui64 = t.EmberEUI64([0x00] * 8) + + if use_hashed_tclk: + if network_info.tc_link_key.key != zigpy_t.KeyData(b"ZigBeeAlliance09"): + LOGGER.warning("Only the well-known TC Link Key is supported") + + isc.bitmask |= t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY + isc.preconfiguredKey, _ = t.EmberKeyData.deserialize( + bytes.fromhex(network_info.stack_specific["ezsp"]["hashed_tclk"]) ) - if hashed_tclk: - isc.preconfiguredKey = t.EmberKeyData(os.urandom(16)) - isc.bitmask |= ( - t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY - ) + else: + isc.preconfiguredKey = t.EmberKeyData(network_info.tc_link_key.key) + return isc + + +def ezsp_key_to_zigpy_key(key, ezsp) -> zigpy.state.Key: + """Convert an `EmberKeyStruct` into a zigpy `Key`.""" + zigpy_key = zigpy.state.Key() + zigpy_key.key = zigpy_t.KeyData(key.key) + + if ezsp.types.EmberKeyStructBitmask.KEY_HAS_SEQUENCE_NUMBER in key.bitmask: + zigpy_key.seq = key.sequenceNumber + + if ezsp.types.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER in key.bitmask: + zigpy_key.tx_counter = key.outgoingFrameCounter + + if ezsp.types.EmberKeyStructBitmask.KEY_HAS_INCOMING_FRAME_COUNTER in key.bitmask: + zigpy_key.rx_counter = key.incomingFrameCounter + + if ezsp.types.EmberKeyStructBitmask.KEY_HAS_PARTNER_EUI64 in key.bitmask: + zigpy_key.partner_ieee = key.partnerEUI64 + + return zigpy_key + + +def zigpy_key_to_ezsp_key(zigpy_key: zigpy.state.Key, ezsp): + """Convert a zigpy `Key` into a `EmberKeyStruct`.""" + key = ezsp.types.EmberKeyStruct() + key.key = ezsp.types.EmberKeyData(zigpy_key.key) + key.bitmask = ezsp.types.EmberKeyStructBitmask(0) + + if zigpy_key.seq is not None: + key.sequenceNumber = t.uint8_t(zigpy_key.seq) + key.bitmask |= ezsp.types.EmberKeyStructBitmask.KEY_HAS_SEQUENCE_NUMBER + + if zigpy_key.tx_counter is not None: + key.outgoingFrameCounter = t.uint32_t(zigpy_key.tx_counter) + key.bitmask |= ezsp.types.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER + + if zigpy_key.rx_counter is not None: + key.outgoingFrameCounter = t.uint32_t(zigpy_key.rx_counter) + key.bitmask |= ezsp.types.EmberKeyStructBitmask.KEY_HAS_INCOMING_FRAME_COUNTER + + if zigpy_key.partner_ieee is not None: + key.partnerEUI64 = t.EmberEUI64(zigpy_key.partner_ieee) + key.bitmask |= ezsp.types.EmberKeyStructBitmask.KEY_HAS_PARTNER_EUI64 + + return key diff --git a/requirements_test.txt b/requirements_test.txt index beb7dcb7..6b9647b0 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -4,6 +4,6 @@ asynctest coveralls==2.1.2 pre-commit pytest -pytest-asyncio +pytest-asyncio>=0.17 pytest-cov pytest-timeout diff --git a/setup.cfg b/setup.cfg index 3f801541..51ea8f48 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,6 +12,9 @@ ignore = E501, D202 +per-file-ignores = + tests/*:F811,F401,F403 + [isort] profile = black # will group `import x` and `from x import` of the same module. @@ -19,3 +22,6 @@ force_sort_within_sections = true known_first_party = bellows,tests forced_separate = tests combine_as_imports = true + +[tool:pytest] +asyncio_mode = auto diff --git a/setup.py b/setup.py index 72a87189..38eaaea0 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ "pyserial", "pyserial-asyncio", "voluptuous", - "zigpy>=0.37.0", + "zigpy>=0.47.0", ], dependency_links=[ "https://codeload.github.com/rcloran/pure-pcapy-3/zip/master", diff --git a/tests/test_application.py b/tests/test_application.py index 59c71bbb..677f371d 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -3,8 +3,7 @@ import pytest import zigpy.config -from zigpy.device import Device -from zigpy.zcl.clusters import security +import zigpy.exceptions import zigpy.zdo.types as zdo_t import bellows.config as config @@ -21,8 +20,6 @@ from .async_mock import AsyncMock, MagicMock, PropertyMock, patch, sentinel -pytestmark = pytest.mark.asyncio - APP_CONFIG = { config.CONF_DEVICE: { config.CONF_DEVICE_PATH: "/dev/null", @@ -99,7 +96,9 @@ def ieee(init=0): @patch("zigpy.device.Device._initialize", new=AsyncMock()) @patch("bellows.zigbee.application.ControllerApplication._watchdog", new=AsyncMock()) -async def _test_startup(app, nwk_type, ieee, auto_form=False, init=0, ezsp_version=4): +async def _test_startup( + app, nwk_type, ieee, auto_form=False, init=0, ezsp_version=4, board_info=True +): nwk_params = bellows.types.struct.EmberNetworkParameters( extendedPanId=t.ExtendedPanId.convert("aa:bb:cc:dd:ee:ff:aa:bb"), panId=t.EmberPanId(0x55AA), @@ -117,6 +116,7 @@ async def mock_leave(*args, **kwargs): app._in_flight_msg = None ezsp_mock = MagicMock() + ezsp_mock.types = ezsp_t7 type(ezsp_mock).ezsp_version = PropertyMock(return_value=ezsp_version) ezsp_mock.initialize = AsyncMock(return_value=ezsp_mock) ezsp_mock.connect = AsyncMock() @@ -126,20 +126,63 @@ async def mock_leave(*args, **kwargs): ezsp_mock.setConfigurationValue = AsyncMock(return_value=t.EmberStatus.SUCCESS) ezsp_mock.networkInit = AsyncMock(return_value=[init]) ezsp_mock.getNetworkParameters = AsyncMock(return_value=[0, nwk_type, nwk_params]) - ezsp_mock.get_board_info = AsyncMock( - return_value=("Mock Manufacturer", "Mock board", "Mock version") - ) + + if board_info: + ezsp_mock.get_board_info = AsyncMock( + return_value=("Mock Manufacturer", "Mock board", "Mock version") + ) + else: + ezsp_mock.get_board_info = AsyncMock(side_effect=EzspError("Not supported")) + ezsp_mock.setPolicy = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) ezsp_mock.getMfgToken = AsyncMock(return_value=(b"Some token\xff",)) ezsp_mock.getNodeId = AsyncMock(return_value=[t.EmberNodeId(0x0000)]) ezsp_mock.getEui64 = AsyncMock(return_value=[ieee]) ezsp_mock.getValue = AsyncMock(return_value=(0, b"\x01" * 6)) ezsp_mock.leaveNetwork = AsyncMock(side_effect=mock_leave) - app.form_network = AsyncMock() ezsp_mock.reset = AsyncMock() ezsp_mock.version = AsyncMock() ezsp_mock.getConfigurationValue = AsyncMock(return_value=(0, 1)) ezsp_mock.update_policies = AsyncMock() + ezsp_mock.networkState = AsyncMock( + return_value=[ezsp_mock.types.EmberNetworkStatus.JOINED_NETWORK] + ) + ezsp_mock.getKey = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + t.EmberKeyStruct( + bitmask=t.EmberKeyStructBitmask.KEY_HAS_SEQUENCE_NUMBER + | t.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER, + type=t.EmberKeyType.CURRENT_NETWORK_KEY, + key=t.EmberKeyData(b"ActualNetworkKey"), + outgoingFrameCounter=t.uint32_t(0x12345678), + incomingFrameCounter=t.uint32_t(0x00000000), + sequenceNumber=t.uint8_t(1), + partnerEUI64=t.EmberEUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"), + ), + ] + ) + + ezsp_mock.getCurrentSecurityState = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + t.EmberCurrentSecurityState( + bitmask=t.EmberCurrentSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY, + trustCenterLongAddress=t.EmberEUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"), + ), + ] + ) + ezsp_mock.pre_permit = AsyncMock() + app.permit = AsyncMock() + + def form_network(): + ezsp_mock.getNetworkParameters.return_value = [ + 0, + t.EmberNodeType.COORDINATOR, + nwk_params, + ] + + app.form_network = AsyncMock(side_effect=form_network) p1 = patch.object(bellows.ezsp, "EZSP", new=ezsp_mock) p2 = patch.object(bellows.multicast.Multicast, "startup") @@ -156,8 +199,8 @@ async def test_startup(app, ieee): async def test_startup_nwk_params(app, ieee): assert app.pan_id == 0xFFFE assert app.extended_pan_id == t.ExtendedPanId.convert("ff:ff:ff:ff:ff:ff:ff:ff") - assert app.channel is None - assert app.channels is None + assert app.channel == 0 + assert app.channels == t.Channels.NO_CHANNELS assert app.nwk_update_id == 0 await _test_startup(app, t.EmberNodeType.COORDINATOR, ieee) @@ -189,7 +232,7 @@ async def test_startup_ezsp_ver8(app, ieee): async def test_startup_no_status(app, ieee): """Test when NCP is not a coordinator and not auto forming.""" - with pytest.raises(ControllerError): + with pytest.raises(zigpy.exceptions.NetworkNotFormed): await _test_startup( app, t.EmberNodeType.UNKNOWN_DEVICE, ieee, auto_form=False, init=1 ) @@ -204,7 +247,7 @@ async def test_startup_no_status_form(app, ieee): async def test_startup_end(app, ieee): """Test when NCP is a End Device and not auto forming.""" - with pytest.raises(ControllerError): + with pytest.raises(zigpy.exceptions.NetworkNotFormed): await _test_startup( app, t.EmberNodeType.SLEEPY_END_DEVICE, ieee, auto_form=False ) @@ -215,14 +258,12 @@ async def test_startup_end_form(app, ieee): await _test_startup(app, t.EmberNodeType.SLEEPY_END_DEVICE, ieee, auto_form=True) -async def test_form_network(app): - f = asyncio.Future() - f.set_result([0]) - app._ezsp.setInitialSecurityState.side_effect = [f] - app._ezsp.formNetwork = AsyncMock() - app._ezsp.setValue = AsyncMock() +async def test_startup_no_board_info(app, ieee, caplog): + """Test when NCP does not support `get_board_info`.""" + with caplog.at_level(logging.INFO): + await _test_startup(app, t.EmberNodeType.COORDINATOR, ieee, board_info=False) - await app.form_network() + assert "EZSP Radio does not support getMfgToken command" in caplog.text def _frame_handler( @@ -1048,12 +1089,12 @@ async def test_shutdown(app): @pytest.fixture def coordinator(app, ieee): - dev = Device(app, ieee, 0x0000) - ep = dev.add_endpoint(1) - ep.profile_id = 0x0104 - ep.device_type = 0xBEEF - ep.add_output_cluster(security.IasZone.cluster_id) - return bellows.zigbee.application.EZSPCoordinator(app, ieee, 0x0000, dev) + dev = bellows.zigbee.application.EZSPCoordinator(app, ieee, 0x0000) + dev.endpoints[1] = bellows.zigbee.application.EZSPCoordinator.EZSPEndpoint(dev, 1) + dev.model = dev.endpoints[1].model + dev.manufacturer = dev.endpoints[1].manufacturer + + return dev async def test_ezsp_add_to_group(coordinator): @@ -1447,7 +1488,7 @@ async def test_set_mfg_id(ieee, expected_mfg_id, app, ezsp_mock): sentinel.parent, ], ) - await asyncio.sleep(0.03) + await asyncio.sleep(0.20) if expected_mfg_id is not None: assert ezsp_mock.setManufacturerCode.await_count == 2 assert ezsp_mock.setManufacturerCode.await_args_list[0][0][0] == expected_mfg_id @@ -1457,3 +1498,44 @@ async def test_set_mfg_id(ieee, expected_mfg_id, app, ezsp_mock): ) else: assert ezsp_mock.setManufacturerCode.await_count == 0 + + +async def test_ensure_network_running_joined(app): + ezsp = app._ezsp + ezsp.networkState = AsyncMock( + return_value=[ezsp.types.EmberNetworkStatus.JOINED_NETWORK] + ) + + rsp = await app._ensure_network_running() + + assert not rsp + + ezsp.networkInit.assert_not_called() + + +async def test_ensure_network_running_not_joined_failure(app): + ezsp = app._ezsp + ezsp.networkState = AsyncMock( + return_value=[ezsp.types.EmberNetworkStatus.NO_NETWORK] + ) + ezsp.networkInit = AsyncMock(return_value=[ezsp.types.EmberStatus.INVALID_CALL]) + + with pytest.raises(zigpy.exceptions.NetworkNotFormed): + await app._ensure_network_running() + + ezsp.networkState.assert_called_once() + ezsp.networkInit.assert_called_once() + + +async def test_ensure_network_running_not_joined_success(app): + ezsp = app._ezsp + ezsp.networkState = AsyncMock( + return_value=[ezsp.types.EmberNetworkStatus.NO_NETWORK] + ) + ezsp.networkInit = AsyncMock(return_value=[ezsp.types.EmberStatus.SUCCESS]) + + rsp = await app._ensure_network_running() + assert rsp + + ezsp.networkState.assert_called_once() + ezsp.networkInit.assert_called_once() diff --git a/tests/test_application_network_state.py b/tests/test_application_network_state.py new file mode 100644 index 00000000..74d1fff7 --- /dev/null +++ b/tests/test_application_network_state.py @@ -0,0 +1,451 @@ +import pytest +import zigpy.state +import zigpy.types as zigpy_t +import zigpy.zdo.types as zdo_t + +import bellows +from bellows.exception import EzspError +import bellows.types as t + +from tests.async_mock import AsyncMock, PropertyMock +from tests.test_application import app, ezsp_mock + + +@pytest.fixture +def node_info(): + return zigpy.state.NodeInfo( + nwk=zigpy_t.NWK(0x0000), + ieee=zigpy_t.EUI64.convert("00:12:4b:00:1c:a1:b8:46"), + logical_type=zdo_t.LogicalType.Coordinator, + ) + + +@pytest.fixture +def network_info(node_info): + return zigpy.state.NetworkInfo( + extended_pan_id=zigpy_t.ExtendedPanId.convert("bd:27:0b:38:37:95:dc:87"), + pan_id=zigpy_t.PanId(0x9BB0), + nwk_update_id=18, + nwk_manager_id=zigpy_t.NWK(0x0000), + channel=zigpy_t.uint8_t(15), + channel_mask=zigpy_t.Channels.ALL_CHANNELS, + security_level=zigpy_t.uint8_t(5), + network_key=zigpy.state.Key( + key=zigpy_t.KeyData.convert("2ccade06b3090c310315b3d574d3c85a"), + seq=108, + tx_counter=118785, + ), + tc_link_key=zigpy.state.Key( + key=zigpy_t.KeyData(b"ZigBeeAlliance09"), + partner_ieee=node_info.ieee, + tx_counter=8712428, + ), + key_table=[ + zigpy.state.Key( + key=zigpy_t.KeyData.convert( + "85:7C:05:00:3E:76:1A:F9:68:9A:49:41:6A:60:5C:76" + ), + tx_counter=3792973670, + rx_counter=1083290572, + seq=147, + partner_ieee=zigpy_t.EUI64.convert("CC:CC:CC:FF:FE:E6:8E:CA"), + ), + zigpy.state.Key( + key=zigpy_t.KeyData.convert( + "CA:02:E8:BB:75:7C:94:F8:93:39:D3:9C:B3:CD:A7:BE" + ), + tx_counter=2597245184, + rx_counter=824424412, + seq=19, + partner_ieee=zigpy_t.EUI64.convert("EC:1B:BD:FF:FE:2F:41:A4"), + ), + ], + children=[zigpy_t.EUI64.convert("00:0B:57:FF:FE:2B:D4:57")], + # If exposed by the stack, NWK addresses of other connected devices on the network + nwk_addresses={ + # Two routers + zigpy_t.EUI64.convert("CC:CC:CC:FF:FE:E6:8E:CA"): zigpy_t.NWK(0x44CB), + zigpy_t.EUI64.convert("EC:1B:BD:FF:FE:2F:41:A4"): zigpy_t.NWK(0x0702), + # Child device + zigpy_t.EUI64.convert("00:0b:57:ff:fe:2b:d4:57"): zigpy_t.NWK(0xC06B), + }, + stack_specific={"ezsp": {"hashed_tclk": "abcdabcdabcdabcdabcdabcdabcdabcd"}}, + source=f"bellows@{bellows.__version__}", + ) + + +def _mock_app_for_load(app): + """Mock methods on the application and EZSP objects to run network state code.""" + ezsp = app._ezsp + + app._ensure_network_running = AsyncMock() + ezsp.getNetworkParameters = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + t.EmberNodeType.COORDINATOR, + t.EmberNetworkParameters( + extendedPanId=t.ExtendedPanId.convert("bd:27:0b:38:37:95:dc:87"), + panId=t.EmberPanId(0x9BB0), + radioTxPower=8, + radioChannel=15, + joinMethod=t.EmberJoinMethod.USE_MAC_ASSOCIATION, + nwkManagerId=t.EmberNodeId(0x0000), + nwkUpdateId=18, + channels=t.Channels.ALL_CHANNELS, + ), + ] + ) + + ezsp.getNodeId = AsyncMock(return_value=[t.EmberNodeId(0x0000)]) + ezsp.getEui64 = AsyncMock( + return_value=[t.EmberEUI64.convert("00:12:4b:00:1c:a1:b8:46")] + ) + ezsp.getConfigurationValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS, 5]) + + def get_key(key_type): + key = { + ezsp.types.EmberKeyType.CURRENT_NETWORK_KEY: ezsp.types.EmberKeyStruct( + bitmask=( + t.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER + | t.EmberKeyStructBitmask.KEY_HAS_SEQUENCE_NUMBER + ), + type=ezsp.types.EmberKeyType.CURRENT_NETWORK_KEY, + key=t.EmberKeyData(bytes.fromhex("2ccade06b3090c310315b3d574d3c85a")), + outgoingFrameCounter=118785, + incomingFrameCounter=0, + sequenceNumber=108, + partnerEUI64=t.EmberEUI64.convert("00:00:00:00:00:00:00:00"), + ), + ezsp.types.EmberKeyType.TRUST_CENTER_LINK_KEY: ezsp.types.EmberKeyStruct( + bitmask=( + t.EmberKeyStructBitmask.KEY_IS_AUTHORIZED + | t.EmberKeyStructBitmask.KEY_HAS_PARTNER_EUI64 + | t.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER + ), + type=ezsp.types.EmberKeyType.TRUST_CENTER_LINK_KEY, + key=t.EmberKeyData(bytes.fromhex("abcdabcdabcdabcdabcdabcdabcdabcd")), + outgoingFrameCounter=8712428, + incomingFrameCounter=0, + sequenceNumber=0, + partnerEUI64=t.EmberEUI64.convert("00:12:4b:00:1c:a1:b8:46"), + ), + }[key_type] + + return (t.EmberStatus.SUCCESS, key) + + ezsp.getKey = AsyncMock(side_effect=get_key) + ezsp.getCurrentSecurityState = AsyncMock( + return_value=( + t.EmberStatus.SUCCESS, + ezsp.types.EmberCurrentSecurityState( + bitmask=( + t.EmberCurrentSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY + | 64 + | 32 + | t.EmberCurrentSecurityBitmask.HAVE_TRUST_CENTER_LINK_KEY + | t.EmberCurrentSecurityBitmask.GLOBAL_LINK_KEY + ), + trustCenterLongAddress=t.EmberEUI64.convert("00:12:4b:00:1c:a1:b8:46"), + ), + ) + ) + + +async def test_load_network_info_no_devices(app, network_info, node_info): + """Test `load_network_info(load_devices=False)`""" + _mock_app_for_load(app) + + await app.load_network_info(load_devices=False) + + assert app.state.node_info == node_info + assert app.state.network_info == network_info.replace( + key_table=[], + children=[], + nwk_addresses={}, + metadata=app.state.network_info.metadata, + ) + + +@pytest.mark.parametrize("ezsp_ver", [4, 6, 7]) +async def test_load_network_info_with_devices(app, network_info, node_info, ezsp_ver): + """Test `load_network_info(load_devices=True)`""" + _mock_app_for_load(app) + + def get_child_data_v6(index): + if index == 0: + status = t.EmberStatus.SUCCESS + else: + status = t.EmberStatus.NOT_JOINED + + return ( + status, + t.EmberNodeId(0xC06B), + t.EmberEUI64.convert("00:0b:57:ff:fe:2b:d4:57"), + t.EmberNodeType.SLEEPY_END_DEVICE, + ) + + def get_child_data_v7(index): + if index == 0: + status = t.EmberStatus.SUCCESS + else: + status = t.EmberStatus.NOT_JOINED + + return ( + status, + app._ezsp.types.EmberChildData( + eui64=t.EmberEUI64.convert("00:0b:57:ff:fe:2b:d4:57"), + type=t.EmberNodeType.SLEEPY_END_DEVICE, + id=t.EmberNodeId(0xC06B), + phy=0, + power=128, + timeout=3, + ), + ) + + app._ezsp.ezsp_version = ezsp_ver + app._ezsp.getChildData = AsyncMock( + side_effect={ + 7: get_child_data_v7, + 6: get_child_data_v6, + 4: get_child_data_v6, + }[ezsp_ver] + ) + + def get_key_table_entry(index): + if index == 0: + return ( + t.EmberStatus.SUCCESS, + app._ezsp.types.EmberKeyStruct( + bitmask=( + t.EmberKeyStructBitmask.KEY_IS_AUTHORIZED + | t.EmberKeyStructBitmask.KEY_HAS_PARTNER_EUI64 + | t.EmberKeyStructBitmask.KEY_HAS_INCOMING_FRAME_COUNTER + | t.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER + ), + type=app._ezsp.types.EmberKeyType.APPLICATION_LINK_KEY, + key=t.EmberKeyData( + bytes.fromhex("857C05003E761AF9689A49416A605C76") + ), + outgoingFrameCounter=3792973670, + incomingFrameCounter=1083290572, + sequenceNumber=147, + partnerEUI64=t.EmberEUI64.convert("CC:CC:CC:FF:FE:E6:8E:CA"), + ), + ) + elif index == 1: + return ( + t.EmberStatus.SUCCESS, + app._ezsp.types.EmberKeyStruct( + bitmask=( + t.EmberKeyStructBitmask.KEY_IS_AUTHORIZED + | t.EmberKeyStructBitmask.KEY_HAS_PARTNER_EUI64 + | t.EmberKeyStructBitmask.KEY_HAS_INCOMING_FRAME_COUNTER + | t.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER + ), + type=app._ezsp.types.EmberKeyType.APPLICATION_LINK_KEY, + key=t.EmberKeyData( + bytes.fromhex("CA02E8BB757C94F89339D39CB3CDA7BE") + ), + outgoingFrameCounter=2597245184, + incomingFrameCounter=824424412, + sequenceNumber=19, + partnerEUI64=t.EmberEUI64.convert("EC:1B:BD:FF:FE:2F:41:A4"), + ), + ) + elif index >= 12: + status = t.EmberStatus.INDEX_OUT_OF_RANGE + else: + status = t.EmberStatus.TABLE_ENTRY_ERASED + + return ( + status, + app._ezsp.types.EmberKeyStruct( + bitmask=t.EmberKeyStructBitmask(244), + type=app._ezsp.types.EmberKeyType(0x46), + key=t.EmberKeyData(bytes.fromhex("b8a11c004b1200cdabcdabcdabcdabcd")), + outgoingFrameCounter=8192, + incomingFrameCounter=0, + sequenceNumber=0, + partnerEUI64=t.EmberEUI64.convert("00:12:4b:00:1c:a1:b8:46"), + ), + ) + + app._ezsp.getKeyTableEntry = AsyncMock(side_effect=get_key_table_entry) + + def get_addr_table_node_id(index): + return ( + { + 16: t.EmberNodeId(0x44CB), + 17: t.EmberNodeId(0x0702), + }.get(index, t.EmberNodeId(0xFFFF)), + ) + + app._ezsp.getAddressTableRemoteNodeId = AsyncMock( + side_effect=get_addr_table_node_id + ) + + def get_addr_table_eui64(index): + if index < 16: + return (t.EmberEUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"),) + elif 16 <= index <= 17: + return ( + { + 16: t.EmberEUI64.convert("cc:cc:cc:ff:fe:e6:8e:ca"), + 17: t.EmberEUI64.convert("ec:1b:bd:ff:fe:2f:41:a4"), + }[index], + ) + else: + return (t.EmberEUI64.convert("00:00:00:00:00:00:00:00"),) + + app._ezsp.getAddressTableRemoteEui64 = AsyncMock(side_effect=get_addr_table_eui64) + + await app.load_network_info(load_devices=True) + + nwk_addresses = network_info.nwk_addresses + + # v4 doesn't support address table reads so the only known addresses are from the + # `getChildData` call + if ezsp_ver == 4: + nwk_addresses = { + ieee: addr + for ieee, addr in network_info.nwk_addresses.items() + if ieee in network_info.children + } + else: + nwk_addresses = network_info.nwk_addresses + + # EZSP doesn't provide a command to set the key sequence number + assert app.state.network_info == network_info.replace( + key_table=[key.replace(seq=0) for key in network_info.key_table], + nwk_addresses=nwk_addresses, + metadata=app.state.network_info.metadata, + ) + assert app.state.node_info == node_info + + # No crash-prone calls were made + if ezsp_ver == 4: + app._ezsp.getAddressTableRemoteNodeId.assert_not_called() + app._ezsp.getAddressTableRemoteEui64.assert_not_called() + + +def _mock_app_for_write(app, network_info, node_info, ezsp_ver=None): + ezsp = app._ezsp + + ezsp.leaveNetwork = AsyncMock(return_value=[t.EmberStatus.NETWORK_DOWN]) + ezsp.getEui64 = AsyncMock( + return_value=[t.EmberEUI64.convert("00:12:4b:00:1c:a1:b8:46")] + ) + + ezsp.setInitialSecurityState = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) + ezsp.clearKeyTable = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) + ezsp.getConfigurationValue = AsyncMock( + return_value=[t.EmberStatus.SUCCESS, t.uint8_t(200)] + ) + ezsp.addOrUpdateKeyTableEntry = AsyncMock( + side_effect=[ + # Only the first one succeeds + (t.EmberStatus.SUCCESS,), + ] + + [ + # The rest will fail + (t.EmberStatus.TABLE_FULL,), + ] + * 20 + ) + + if ezsp_ver is not None: + ezsp.ezsp_version = ezsp_ver + + if ezsp_ver == 4: + ezsp.setValue = AsyncMock(return_value=[t.EmberStatus.BAD_ARGUMENT]) + else: + ezsp.setValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) + + ezsp.formNetwork = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) + ezsp.setValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) + ezsp.setMfgToken = AsyncMock(return_value=[t.EmberStatus.SUCCESS]) + + +async def test_write_network_info_failed_leave1(app, network_info, node_info): + _mock_app_for_write(app, network_info, node_info) + + app._ezsp.leaveNetwork.return_value = [t.EmberStatus.BAD_ARGUMENT] + + with pytest.raises(zigpy.exceptions.FormationFailure): + await app.write_network_info(network_info=network_info, node_info=node_info) + + +async def test_write_network_info_failed_leave2(app, network_info, node_info): + _mock_app_for_write(app, network_info, node_info) + + app._ezsp.leaveNetwork.side_effect = EzspError("failed to leave network") + + await app.write_network_info(network_info=network_info, node_info=node_info) + + +@pytest.mark.parametrize("ezsp_ver", [4, 7]) +async def test_write_network_info(app, network_info, node_info, ezsp_ver): + _mock_app_for_write(app, network_info, node_info, ezsp_ver) + + await app.write_network_info(network_info=network_info, node_info=node_info) + + +async def test_write_network_info_write_new_eui64(app, network_info, node_info): + _mock_app_for_write(app, network_info, node_info) + + # Differs from what is in `node_info` + app._ezsp.getEui64.return_value = [t.EmberEUI64.convert("AA:AA:AA:AA:AA:AA:AA:AA")] + + await app.write_network_info( + network_info=network_info.replace( + stack_specific={ + "ezsp": { + "i_understand_i_can_update_eui64_only_once_and_i_still_want_to_do_it": True, + **network_info.stack_specific["ezsp"], + } + } + ), + node_info=node_info, + ) + + # The EUI64 is written + expected_eui64 = t.EmberEUI64(node_info.ieee).serialize() + app._ezsp.setMfgToken.assert_called_once_with( + t.EzspMfgTokenId.MFG_CUSTOM_EUI_64, expected_eui64 + ) + + +async def test_write_network_info_dont_write_new_eui64(app, network_info, node_info): + _mock_app_for_write(app, network_info, node_info) + + # Differs from what is in `node_info` + app._ezsp.getEui64.return_value = [t.EmberEUI64.convert("AA:AA:AA:AA:AA:AA:AA:AA")] + + await app.write_network_info( + # We don't provide the magic key so nothing is written + network_info=network_info, + node_info=node_info, + ) + + # The EUI64 is *not* written + app._ezsp.setMfgToken.assert_not_called() + + +async def test_write_network_info_generate_hashed_tclk(app, network_info, node_info): + _mock_app_for_write(app, network_info, node_info) + + seen_keys = set() + + for i in range(10): + app._ezsp.setInitialSecurityState.reset_mock() + + await app.write_network_info( + network_info=network_info.replace(stack_specific={}), + node_info=node_info, + ) + + call = app._ezsp.setInitialSecurityState.mock_calls[0] + seen_keys.add(tuple(call[1][0].preconfiguredKey)) + + # A new hashed key is randomly generated each time if none is provided + assert len(seen_keys) == 10 diff --git a/tests/test_cli.py b/tests/test_cli.py index ecf8d4f2..d7aca4c1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,3 @@ -import bellows.cli # noqa: F401 +import bellows.cli # Just being able to import is a small test... diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index 77b2d252..6597f42f 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -15,8 +15,6 @@ config.CONF_DEVICE_BAUDRATE: 115200, } -pytestmark = pytest.mark.asyncio - @pytest.fixture async def ezsp_f(): diff --git a/tests/test_ezsp_protocol.py b/tests/test_ezsp_protocol.py index 754ad2ba..94595378 100644 --- a/tests/test_ezsp_protocol.py +++ b/tests/test_ezsp_protocol.py @@ -8,8 +8,6 @@ from .async_mock import AsyncMock, MagicMock, patch -pytestmark = pytest.mark.asyncio - class _DummyProtocolHandler(bellows.ezsp.v4.EZSPv4): """Protocol handler mock.""" @@ -29,7 +27,6 @@ def prot_hndl(): return _DummyProtocolHandler(MagicMock(), MagicMock()) -@pytest.mark.asyncio async def test_command(prot_hndl): coro = prot_hndl.command("nop") prot_hndl._awaiting[prot_hndl._seq - 1][2].set_result(True) @@ -73,7 +70,6 @@ def test_receive_reply_invalid_command(prot_hndl): assert prot_hndl.cb_mock.call_count == 0 -@pytest.mark.asyncio async def test_cfg_initialize(prot_hndl, caplog): """Test initialization.""" @@ -94,7 +90,6 @@ async def test_cfg_initialize(prot_hndl, caplog): assert "Couldn't set" in caplog.text -@pytest.mark.asyncio async def test_update_policies(prot_hndl): """Test update_policies.""" @@ -107,7 +102,6 @@ async def test_update_policies(prot_hndl): await prot_hndl.update_policies({"ezsp_policies": {}}) -@pytest.mark.asyncio @pytest.mark.parametrize( "status, raw, expected_value", ( diff --git a/tests/test_ezsp_v5.py b/tests/test_ezsp_v5.py index 41a78396..714bc25d 100644 --- a/tests/test_ezsp_v5.py +++ b/tests/test_ezsp_v5.py @@ -4,8 +4,6 @@ from .async_mock import AsyncMock, MagicMock, patch -pytestmark = pytest.mark.asyncio - @pytest.fixture def ezsp_f(): diff --git a/tests/test_ezsp_v8.py b/tests/test_ezsp_v8.py index 6ea14672..a39f9384 100644 --- a/tests/test_ezsp_v8.py +++ b/tests/test_ezsp_v8.py @@ -4,8 +4,6 @@ from .async_mock import AsyncMock, MagicMock, patch -pytestmark = pytest.mark.asyncio - @pytest.fixture def ezsp_f(): @@ -27,7 +25,6 @@ def test_ezsp_frame_rx(ezsp_f): assert ezsp_f._handle_callback.call_args[0][1] == [0x01, 0x02, 0x1234] -@pytest.mark.asyncio async def test_set_source_routing(ezsp_f): """Test setting source routing.""" with patch.object( @@ -37,7 +34,6 @@ async def test_set_source_routing(ezsp_f): assert src_mock.await_count == 1 -@pytest.mark.asyncio async def test_pre_permit(ezsp_f): """Test pre permit.""" p1 = patch.object(ezsp_f, "setPolicy", new=AsyncMock()) diff --git a/tests/test_ezsp_v9.py b/tests/test_ezsp_v9.py index 4cb499fd..f4876b4c 100644 --- a/tests/test_ezsp_v9.py +++ b/tests/test_ezsp_v9.py @@ -4,8 +4,6 @@ from .async_mock import AsyncMock, MagicMock, patch -pytestmark = pytest.mark.asyncio - @pytest.fixture def ezsp_f(): @@ -27,7 +25,6 @@ def test_ezsp_frame_rx(ezsp_f): assert ezsp_f._handle_callback.call_args[0][1] == [0x01, 0x02, 0x1234] -@pytest.mark.asyncio async def test_set_source_routing(ezsp_f): """Test setting source routing.""" with patch.object( @@ -37,7 +34,6 @@ async def test_set_source_routing(ezsp_f): assert src_mock.await_count == 1 -@pytest.mark.asyncio async def test_pre_permit(ezsp_f): """Test pre permit.""" p1 = patch.object(ezsp_f, "setPolicy", new=AsyncMock()) diff --git a/tests/test_multicast.py b/tests/test_multicast.py index 5ae884c1..47d5a6ee 100644 --- a/tests/test_multicast.py +++ b/tests/test_multicast.py @@ -7,8 +7,6 @@ from .async_mock import AsyncMock, MagicMock, sentinel -pytestmark = pytest.mark.asyncio - CUSTOM_SIZE = 12 @@ -25,7 +23,6 @@ def multicast(ezsp_f): return m -@pytest.mark.asyncio async def test_initialize(multicast): group_id = 0x0200 mct = active_multicasts = 4 @@ -49,7 +46,6 @@ async def mock_get(*args): assert len(multicast._available) == CUSTOM_SIZE - active_multicasts -@pytest.mark.asyncio async def test_initialize_fail_configured_size(multicast): multicast._ezsp.getConfigurationValue.return_value = t.EmberStatus.ERR_FATAL, 16 @@ -59,7 +55,6 @@ async def test_initialize_fail_configured_size(multicast): assert len(multicast._available) == 0 -@pytest.mark.asyncio async def test_initialize_fail(multicast): group_id = 0x0200 @@ -79,7 +74,6 @@ async def mock_get(*args): assert len(multicast._available) == 0 -@pytest.mark.asyncio async def test_startup(multicast): coordinator = MagicMock() @@ -107,7 +101,6 @@ async def mock_set(*args): return multicast.subscribe(group_id) -@pytest.mark.asyncio async def test_subscribe(multicast): grp_id = 0x0200 multicast._available.add(1) @@ -128,7 +121,6 @@ async def test_subscribe(multicast): assert grp_id in multicast._multicast -@pytest.mark.asyncio async def test_subscribe_fail(multicast): grp_id = 0x0200 multicast._available.add(1) @@ -142,7 +134,6 @@ async def test_subscribe_fail(multicast): assert len(multicast._available) == 1 -@pytest.mark.asyncio async def test_subscribe_no_avail(multicast): grp_id = 0x0200 @@ -161,7 +152,6 @@ async def mock_set(*args): return multicast.unsubscribe(group_id) -@pytest.mark.asyncio async def test_unsubscribe(multicast): grp_id = 0x0200 multicast._available.add(1) @@ -185,7 +175,6 @@ async def test_unsubscribe(multicast): assert len(multicast._available) == 1 -@pytest.mark.asyncio async def test_unsubscribe_fail(multicast): grp_id = 0x0200 multicast._available.add(1) diff --git a/tests/test_thread.py b/tests/test_thread.py index fa51cfee..561c7cc1 100644 --- a/tests/test_thread.py +++ b/tests/test_thread.py @@ -7,8 +7,6 @@ from bellows.thread import EventLoopThread, ThreadsafeProxy -pytestmark = pytest.mark.asyncio - async def test_thread_start(monkeypatch): current_loop = asyncio.get_event_loop() diff --git a/tests/test_types.py b/tests/test_types.py index e94785ab..ac2533af 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,5 +1,4 @@ import pytest -import zigpy.types as zt import zigpy.zdo.types as zdo_t import bellows.types as t @@ -222,26 +221,3 @@ def test_ember_node_type_to_zdo_logical_type(node_type, logical_type): node_type = t.EmberNodeType(node_type) assert node_type.zdo_logical_type == zdo_t.LogicalType(logical_type) - - -def test_ember_network_params_to_network_info(): - """Coverage for Ember Network param to zigpy network information converter.""" - - network_params = t.EmberNetworkParameters( - t.ExtendedPanId.convert("ff:7b:aa:bb:cc:dd:ee:ff"), - panId=0xD539, - radioTxPower=8, - radioChannel=20, - joinMethod=t.EmberJoinMethod.USE_MAC_ASSOCIATION, - nwkManagerId=0x1234, - nwkUpdateId=22, - channels=t.Channels.ALL_CHANNELS, - ) - network_info = network_params.zigpy_network_information - assert network_info.extended_pan_id == t.ExtendedPanId.convert( - "ff:7b:aa:bb:cc:dd:ee:ff" - ) - assert network_info.pan_id == zt.PanId(0xD539) - assert network_info.nwk_update_id == 22 - assert network_info.nwk_manager_id == zt.NWK(0x1234) - assert network_info.channel == 20 diff --git a/tests/test_uart.py b/tests/test_uart.py index 25d0c639..7d27ccf4 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -9,10 +9,9 @@ from .async_mock import AsyncMock, MagicMock, sentinel -pytestmark = pytest.mark.asyncio - -async def test_connect(monkeypatch): +@pytest.mark.parametrize("flow_control", [conf.CONF_FLOW_CONTROL_DEFAULT, "hardware"]) +async def test_connect(flow_control, monkeypatch): appmock = MagicMock() transport = MagicMock() @@ -24,7 +23,11 @@ async def mockconnect(loop, protocol_factory, **kwargs): monkeypatch.setattr(serial_asyncio, "create_serial_connection", mockconnect) gw = await uart.connect( conf.SCHEMA_DEVICE( - {conf.CONF_DEVICE_PATH: "/dev/serial", conf.CONF_DEVICE_BAUDRATE: 115200} + { + conf.CONF_DEVICE_PATH: "/dev/serial", + conf.CONF_DEVICE_BAUDRATE: 115200, + conf.CONF_FLOW_CONTROL: flow_control, + } ), appmock, use_thread=False, diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 00000000..725b505f --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,145 @@ +import logging + +import pytest +import zigpy.state +import zigpy.types as t +import zigpy.zdo.types as zdo_t + +import bellows.types as bellows_t +import bellows.zigbee.util as util + +from tests.test_application import ezsp_mock +from tests.test_application_network_state import network_info, node_info + + +@pytest.fixture +def zigpy_key(network_info, node_info): + return network_info.network_key.replace( + rx_counter=1234567, + partner_ieee=node_info.ieee, + ) + + +@pytest.fixture +def ezsp_key(ezsp_mock, network_info, node_info, zigpy_key): + ezsp = ezsp_mock + return ezsp.types.EmberKeyStruct( + bitmask=( + ezsp.types.EmberKeyStructBitmask.KEY_HAS_SEQUENCE_NUMBER + | ezsp.types.EmberKeyStructBitmask.KEY_HAS_OUTGOING_FRAME_COUNTER + | ezsp.types.EmberKeyStructBitmask.KEY_HAS_INCOMING_FRAME_COUNTER + | ezsp.types.EmberKeyStructBitmask.KEY_HAS_PARTNER_EUI64 + ), + key=ezsp.types.EmberKeyData(network_info.network_key.key), + sequenceNumber=zigpy_key.seq, + outgoingFrameCounter=zigpy_key.tx_counter, + incomingFrameCounter=zigpy_key.rx_counter, + partnerEUI64=bellows_t.EmberEUI64(node_info.ieee), + ) + + +def test_zha_security_normal(network_info, node_info): + security = util.zha_security( + network_info=network_info, node_info=node_info, use_hashed_tclk=True + ) + + assert security.preconfiguredTrustCenterEui64 == bellows_t.EmberEUI64([0x00] * 8) + assert ( + bellows_t.EmberInitialSecurityBitmask.HAVE_TRUST_CENTER_EUI64 + not in security.bitmask + ) + + assert ( + security.preconfiguredKey.serialize().hex() + == network_info.stack_specific["ezsp"]["hashed_tclk"] + ) + assert ( + bellows_t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY + in security.bitmask + ) + + +def test_zha_security_router(network_info, node_info): + security = util.zha_security( + network_info=network_info, + node_info=node_info.replace(logical_type=zdo_t.LogicalType.Router), + use_hashed_tclk=False, + ) + + assert security.preconfiguredTrustCenterEui64 == bellows_t.EmberEUI64( + network_info.tc_link_key.partner_ieee + ) + assert ( + bellows_t.EmberInitialSecurityBitmask.HAVE_TRUST_CENTER_EUI64 + in security.bitmask + ) + + assert security.preconfiguredKey == network_info.tc_link_key.key + assert ( + bellows_t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY + not in security.bitmask + ) + + +def test_zha_security_router_unknown_tclk_partner_ieee(network_info, node_info): + security = util.zha_security( + network_info=network_info.replace( + tc_link_key=network_info.tc_link_key.replace(partner_ieee=t.EUI64.UNKNOWN) + ), + node_info=node_info.replace(logical_type=zdo_t.LogicalType.Router), + use_hashed_tclk=False, + ) + + # Not set, since we don't know it + assert security.preconfiguredTrustCenterEui64 == bellows_t.EmberEUI64([0x00] * 8) + assert ( + bellows_t.EmberInitialSecurityBitmask.HAVE_TRUST_CENTER_EUI64 + not in security.bitmask + ) + + +def test_zha_security_replace_missing_tc_partner_addr(network_info, node_info): + security = util.zha_security( + network_info=network_info.replace( + tc_link_key=network_info.tc_link_key.replace(partner_ieee=t.EUI64.UNKNOWN) + ), + node_info=node_info, + use_hashed_tclk=True, + ) + + assert node_info.ieee != t.EUI64.UNKNOWN + assert security.preconfiguredTrustCenterEui64 == bellows_t.EmberEUI64([0x00] * 8) + + +def test_zha_security_hashed_nonstandard_tclk_warning(network_info, node_info, caplog): + # Nothing should be logged normally + with caplog.at_level(logging.WARNING): + util.zha_security( + network_info=network_info, + node_info=node_info, + use_hashed_tclk=True, + ) + + assert "Only the well-known TC Link Key is supported" not in caplog.text + + # But it will be when a non-standard TCLK is used along with TCLK hashing + with caplog.at_level(logging.WARNING): + util.zha_security( + network_info=network_info.replace( + tc_link_key=network_info.tc_link_key.replace( + key=t.KeyData(b"ANonstandardTCLK") + ) + ), + node_info=node_info, + use_hashed_tclk=True, + ) + + assert "Only the well-known TC Link Key is supported" in caplog.text + + +def test_ezsp_key_to_zigpy_key(zigpy_key, ezsp_key, ezsp_mock): + return util.ezsp_key_to_zigpy_key(ezsp_key, ezsp_mock) == zigpy_key + + +def test_zigpy_key_to_ezsp_key(zigpy_key, ezsp_key, ezsp_mock): + return util.zigpy_key_to_ezsp_key(zigpy_key, ezsp_mock) == ezsp_key