diff --git a/tests/test_xbee.py b/tests/test_xbee.py index 21db589a2b..d49a5f1f0c 100644 --- a/tests/test_xbee.py +++ b/tests/test_xbee.py @@ -269,7 +269,7 @@ async def test_receive_serial_data(zigpy_device_from_quirk): None, b"2\x00\x02\x01\xff\xff\xff\xff\xff\xff\xff\xff\xff\xfe%V", b"\x01%V\x00\x0C\xE4", - "%v_command_response", + "percentv_command_response", 3300, ), # Write uint16_t argument @@ -278,7 +278,7 @@ async def test_receive_serial_data(zigpy_device_from_quirk): 2700, b"2\x00\x02\x01\xff\xff\xff\xff\xff\xff\xff\xff\xff\xfeV+\x0A\x8C", b"\x01V+\x00", - "v+_command_response", + "vplus_command_response", None, ), # Read uint8_t argument @@ -319,7 +319,7 @@ async def test_receive_serial_data(zigpy_device_from_quirk): ), # Command with no arguments ( - 94, + 93, None, b"2\x00\x02\x01\xff\xff\xff\xff\xff\xff\xff\xff\xff\xfeAS", b"\x01AS\x00", @@ -417,7 +417,7 @@ def mock_at_response(*args, **kwargs): # Write bool argument (31, "PM", True, None), # Command with no arguments - (94, "AS", None, None), + (93, "AS", None, None), ), ) async def test_remote_at_native( @@ -457,7 +457,9 @@ async def test_remote_at_native( ) assert status == foundation.Status.SUCCESS listener.zha_send_event.assert_called_once_with( - command.lower() + "_command_response", {"response": response_value} + command.replace("%V", "PercentV").replace("V+", "VPlus").lower() + + "_command_response", + {"response": response_value}, ) diff --git a/xbee.md b/xbee.md index c74159af86..31a0d8d188 100644 --- a/xbee.md +++ b/xbee.md @@ -1,3 +1,34 @@ +This document describes how to use Digi XBee device as a router or end device. + +## Using with non-XBee coordinator + +You may need to configure zigpy to listen to the appropriate additional endpoints which it ignores by default. This is an example config for HA ZHA: + +``` +zha: + zigpy_config: + additional_endpoints: + - endpoint: 0xE6 + profile: 0xC105 + device_type: 0x0000 + device_version: 0b0000 + input_clusters: [0xA1] + output_clusters: [0x21] + - endpoint: 0xE8 + profile: 0xC105 + device_type: 0x0000 + device_version: 0b0000 + input_clusters: [0x11, 0x92] + output_clusters: [0x11] +``` +If you are using `zigpy_znp`, you might also need need to add +``` + znp_config: + prefer_endpoint_1: false +``` +to the `zigpy_config:` section. +Please note that not all coordinators have been tested yet. + ## Digital GPIO Digital input/output pins are exposed as switches. @@ -58,7 +89,8 @@ automation: cluster_type: in command: 0 command_type: server - args: Assistant + params: + data: Assistant ``` ## Remote AT Commands @@ -98,4 +130,5 @@ automation: command_type: server cluster_type: out cluster_id: 33 + params: {} ``` diff --git a/zhaquirks/__init__.py b/zhaquirks/__init__.py index 5ebf7ca839..d6e439a451 100755 --- a/zhaquirks/__init__.py +++ b/zhaquirks/__init__.py @@ -120,7 +120,7 @@ def handle_cluster_request( ): self.listener_event( ZHA_SEND_EVENT, - self.server_commands.get(hdr.command_id, (hdr.command_id))[0], + self.server_commands.get(hdr.command_id, (hdr.command_id)).name, args, ) diff --git a/zhaquirks/xbee/__init__.py b/zhaquirks/xbee/__init__.py index 70c19b896d..c4e60302fb 100644 --- a/zhaquirks/xbee/__init__.py +++ b/zhaquirks/xbee/__init__.py @@ -236,7 +236,6 @@ def deserialize(cls, data): "CB": uint8_t, "DN": Bytes, # "up to 20-Byte printable ASCII string" "IS": None, - "1S": None, "AS": None, # Stuff I've guessed # "CE": uint8_t, @@ -346,7 +345,15 @@ class XBeeRemoteATRequest(LocalDataCluster): """Remote AT Command Request Cluster.""" cluster_id = XBEE_AT_REQUEST_CLUSTER - server_commands = {} + client_commands = {} + server_commands = { + k: foundation.ZCLCommandDef( + name=v[0].replace("%V", "PercentV").replace("V+", "VPlus"), + schema={"param?": v[1]} if v[1] else {}, + is_manufacturer_specific=True, + ) + for k, v in zip(range(1, len(AT_COMMANDS) + 1), AT_COMMANDS.items()) + } _seq: int = 1 @@ -381,14 +388,6 @@ def deserialize(cls, data): data = data[cls._size :] return r, data - def __init__(self, *args, **kwargs): - """Generate client_commands from AT_COMMANDS.""" - super().__init__(*args, **kwargs) - self.client_commands = { - k: (v[0], (v[1],), None) - for k, v in zip(range(1, len(AT_COMMANDS) + 1), AT_COMMANDS.items()) - } - def _save_at_request(self, frame_id, future): self._endpoint.in_clusters[XBEE_AT_RESPONSE_CLUSTER].save_at_request( frame_id, future @@ -443,8 +442,8 @@ async def _command(self, options, command, data, *args): 0x00, options, frame_id, - self._endpoint.device.application.ieee, - self._endpoint.device.application.nwk, + self._endpoint.device.application.state.node_info.ieee, + self._endpoint.device.application.state.node_info.nwk, command, data, ), @@ -471,25 +470,29 @@ async def _command(self, options, command, data, *args): return future async def command( - self, command_id, *args, manufacturer=None, expect_reply=False, tsn=None + self, + command_id, + param=None, + *args, + manufacturer=None, + expect_reply=False, + tsn=None, ): """Handle AT request.""" - command = self.client_commands[command_id][0] - try: - value = args[0] - if isinstance(value, dict): - value = None - except IndexError: - value = None - - if value is not None: - value = await self.remote_at_command(command, value) + command = ( + self.server_commands[command_id] + .name.replace("PercentV", "%V") + .replace("VPlus", "V+") + ) + + if param is not None: + value = await self.remote_at_command(command, param) else: value = await self.remote_at_command(command) tsn = self._endpoint.device.application.get_sequence() hdr = foundation.ZCLHeader.cluster(tsn, command_id) - self._endpoint.device.endpoints[232].out_clusters[ + self._endpoint.device.endpoints[XBEE_DATA_ENDPOINT].out_clusters[ LevelControl.cluster_id ].handle_cluster_request(hdr, {"response": value}) @@ -537,16 +540,13 @@ def handle_cluster_request( ): """Handle AT response.""" if hdr.command_id == DATA_IN_CMD: - frame_id = args[0] - cmd = args[1] - status = args[2] - value = args[3] _LOGGER.debug( - "Remote AT command response: %s", (frame_id, cmd, status, value) + "Remote AT command response: %s", + (args.frame_id, args.cmd, args.status, args.value), ) - (fut,) = self._awaiting.pop(frame_id) + (fut,) = self._awaiting.pop(args.frame_id) try: - status = self.ATCommandResult(status) + status = self.ATCommandResult(args.status) except ValueError: status = self.ATCommandResult.ERROR @@ -556,12 +556,12 @@ def handle_cluster_request( ) return - response_type = AT_COMMANDS[cmd.decode("ascii")] - if response_type is None or len(value) == 0: + response_type = AT_COMMANDS[args.cmd.decode("ascii")] + if response_type is None or len(args.value) == 0: fut.set_result(None) return - response, remains = response_type.deserialize(value) + response, remains = response_type.deserialize(args.value) fut.set_result(response) else: @@ -569,15 +569,15 @@ def handle_cluster_request( client_commands = {} server_commands = { - 0x0000: ( - "remote_at_response", - ( - uint8_t, - ATCommand, - uint8_t, - Bytes, - ), - None, + 0x0000: foundation.ZCLCommandDef( + name="remote_at_response", + schema={ + "frame_id": uint8_t, + "cmd": ATCommand, + "status": uint8_t, + "value": Bytes, + }, + is_manufacturer_specific=True, ) } @@ -588,7 +588,7 @@ class XBeeCommon(CustomDevice): def remote_at(self, command, *args, **kwargs): """Remote at command.""" return ( - self.endpoints[230] + self.endpoints[XBEE_AT_ENDPOINT] .out_clusters[XBEE_AT_REQUEST_CLUSTER] .remote_at_command(command, *args, apply_changes=True, **kwargs) ) @@ -663,12 +663,13 @@ def deserialize(cls, data): ) sample_index += 2 else: - analog_samples.append(0) + analog_samples.append(None) + for dpin in range(len(digital_pins)): + if digital_pins[dpin] == 0: + digital_samples[dpin] = None return ( { - "digital_pins": digital_pins, - "analog_pins": analog_pins, "digital_samples": digital_samples, "analog_samples": analog_samples, }, @@ -689,21 +690,25 @@ def handle_cluster_request( Update the digital pin states """ if hdr.command_id == ON_OFF_CMD: - values = args[0] - if "digital_pins" in values and "digital_samples" in values: + values = args.io_sample + if "digital_samples" in values: # Update digital inputs active_pins = [ - i for i, x in enumerate(values["digital_pins"]) if x == 1 + i + for i, x in enumerate(values["digital_samples"]) + if x is not None ] for pin in active_pins: # pylint: disable=W0212 self._endpoint.device[0xD0 + pin].on_off._update_attribute( ATTR_ON_OFF, values["digital_samples"][pin] ) - if "analog_pins" in values and "analog_samples" in values: + if "analog_samples" in values: # Update analog inputs active_pins = [ - i for i, x in enumerate(values["analog_pins"]) if x == 1 + i + for i, x in enumerate(values["analog_samples"]) + if x is not None ] for pin in active_pins: # pylint: disable=W0212 @@ -719,7 +724,13 @@ def handle_cluster_request( attributes = {0x0055: ("present_value", t.Bool)} client_commands = {} - server_commands = {0x0000: ("io_sample", (IOSample,), False)} + server_commands = { + 0x0000: foundation.ZCLCommandDef( + name="io_sample", + schema={"io_sample": IOSample}, + is_manufacturer_specific=True, + ) + } # pylint: disable=too-many-ancestors class EventRelayCluster(EventableCluster, LocalDataCluster, LevelControl): @@ -727,15 +738,18 @@ class EventRelayCluster(EventableCluster, LocalDataCluster, LevelControl): attributes = {} client_commands = {} - - def __init__(self, *args, **kwargs): - """Generate server_commands from AT_COMMANDS.""" - super().__init__(*args, **kwargs) - self.server_commands = { - k: (v[0].lower() + "_command_response", (str,), None) - for k, v in zip(range(1, len(AT_COMMANDS) + 1), AT_COMMANDS.items()) - } - self.server_commands[0x0000] = ("receive_data", (str,), None) + server_commands = { + k: foundation.ZCLCommandDef( + name=v[0].replace("%V", "PercentV").replace("V+", "VPlus").lower() + + "_command_response", + schema={"response?": v[1]} if v[1] else {}, + is_manufacturer_specific=True, + ) + for k, v in zip(range(1, len(AT_COMMANDS) + 1), AT_COMMANDS.items()) + } + server_commands[0x0000] = foundation.ZCLCommandDef( + name="receive_data", schema={"data": str}, is_manufacturer_specific=True + ) class SerialDataCluster(LocalDataCluster): """Serial Data Cluster for the XBee.""" @@ -757,10 +771,16 @@ def deserialize(cls, data): return (cls(data), b"") async def command( - self, command_id, *args, manufacturer=None, expect_reply=False, tsn=None + self, + command_id, + data, + *args, + manufacturer=None, + expect_reply=False, + tsn=None, ): """Handle outgoing data.""" - data = self.BinaryString(args[0]).serialize() + data = self.BinaryString(data).serialize() return foundation.GENERAL_COMMANDS[ foundation.GeneralCommand.Default_Response ].schema( @@ -792,21 +812,33 @@ def handle_cluster_request( if hdr.command_id == DATA_IN_CMD: self._endpoint.out_clusters[ LevelControl.cluster_id - ].handle_cluster_request(hdr, {"data": args[0]}) + ].handle_cluster_request(hdr, {"data": args.data}) else: super().handle_cluster_request(hdr, args) attributes = {} - client_commands = {0x0000: ("send_data", (BinaryString,), None)} - server_commands = {0x0000: ("receive_data", (BinaryString,), None)} + client_commands = { + 0x0000: foundation.ZCLCommandDef( + name="send_data", + schema={"data": BinaryString}, + is_manufacturer_specific=True, + ) + } + server_commands = { + 0x0000: foundation.ZCLCommandDef( + name="receive_data", + schema={"data": BinaryString}, + is_manufacturer_specific=True, + ) + } replacement = { ENDPOINTS: { - 230: { + XBEE_AT_ENDPOINT: { INPUT_CLUSTERS: [XBeeRemoteATResponse], OUTPUT_CLUSTERS: [XBeeRemoteATRequest], }, - 232: { + XBEE_DATA_ENDPOINT: { INPUT_CLUSTERS: [DigitalIOCluster, SerialDataCluster, XBeeBasic], OUTPUT_CLUSTERS: [SerialDataCluster, EventRelayCluster], },