# -*- coding: utf-8 -*- """ Copyright (C) 2024 Xiaomi Corporation. The ownership and intellectual property rights of Xiaomi Home Assistant Integration and related Xiaomi cloud service API interface provided under this license, including source code and object code (collectively, "Licensed Work"), are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi hereby grants you a personal, limited, non-exclusive, non-transferable, non-sublicensable, and royalty-free license to reproduce, use, modify, and distribute the Licensed Work only for your use of Home Assistant for non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize you to use the Licensed Work for any other purpose, including but not limited to use Licensed Work to develop applications (APP), Web services, and other forms of software. You may reproduce and distribute copies of the Licensed Work, with or without modifications, whether in source or object form, provided that you must give any other recipients of the Licensed Work a copy of this License and retain all copyright and disclaimers. Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible for any direct, indirect, special, incidental, or consequential damages or losses arising from the use or inability to use the Licensed Work. Xiaomi reserves all rights not expressly granted to you in this License. Except for the rights expressly granted by Xiaomi under this License, Xiaomi does not authorize you in any form to use the trademarks, copyrights, or other forms of intellectual property rights of Xiaomi and its affiliates, including, without limitation, without obtaining other written permission from Xiaomi, you shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that may make the public associate with Xiaomi in any form to publicize or promote the software or hardware devices that use the Licensed Work. Xiaomi has the right to immediately terminate all your authorization under this License in the event: 1. You assert patent invalidation, litigation, or other claims against patents or other intellectual property rights of Xiaomi or its affiliates; or, 2. You make, have made, manufacture, sell, or offer to sell products that knock off Xiaomi or its affiliates' products. MIoT lan device control, only support MIoT SPEC-v2 WiFi devices. """ import json import time import asyncio from dataclasses import dataclass from enum import Enum, auto import logging import os import queue import random import secrets import socket import struct import threading from typing import Callable, Optional, final from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes # pylint: disable=relative-beyond-top-level from .miot_error import MIoTErrorCode from .miot_ev import MIoTEventLoop, TimeoutHandle from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo from .miot_mdns import MipsService, MipsServiceState from .common import randomize_int, MIoTMatcher _LOGGER = logging.getLogger(__name__) class MIoTLanCmdType(Enum): """MIoT lan command.""" DEINIT = 0 CALL_API = auto() SUB_DEVICE_STATE = auto() UNSUB_DEVICE_STATE = auto() REG_BROADCAST = auto() UNREG_BROADCAST = auto() GET_DEV_LIST = auto() DEVICE_UPDATE = auto() DEVICE_DELETE = auto() NET_INFO_UPDATE = auto() NET_IFS_UPDATE = auto() OPTIONS_UPDATE = auto() @dataclass class MIoTLanCmd: """MIoT lan command.""" type_: MIoTLanCmdType data: any @dataclass class MIoTLanCmdData: handler: Callable[[dict, any], None] handler_ctx: any timeout_ms: int @dataclass class MIoTLanGetDevListData(MIoTLanCmdData): ... @dataclass class MIoTLanCallApiData(MIoTLanCmdData): did: str msg: dict class MIoTLanSendBroadcastData(MIoTLanCallApiData): ... @dataclass class MIoTLanUnregisterBroadcastData: key: str @dataclass class MIoTLanRegisterBroadcastData: key: str handler: Callable[[dict, any], None] handler_ctx: any @dataclass class MIoTLanUnsubDeviceState: key: str @dataclass class MIoTLanSubDeviceState: key: str handler: Callable[[str, dict, any], None] handler_ctx: any @dataclass class MIoTLanNetworkUpdateData: status: InterfaceStatus if_name: str @dataclass class MIoTLanRequestData: msg_id: int handler: Callable[[dict, any], None] handler_ctx: any timeout: TimeoutHandle class MIoTLanDeviceState(Enum): FRESH = 0 PING1 = auto() PING2 = auto() PING3 = auto() DEAD = auto() class MIoTLanDevice: """MIoT lan device.""" # pylint: disable=unused-argument OT_HEADER: int = 0x2131 OT_HEADER_LEN: int = 32 NETWORK_UNSTABLE_CNT_TH: int = 10 NETWORK_UNSTABLE_TIME_TH: int = 120000 NETWORK_UNSTABLE_RESUME_TH: int = 300 FAST_PING_INTERVAL: int = 5000 CONSTRUCT_STATE_PENDING: int = 15000 KA_INTERVAL_MIN = 10000 KA_INTERVAL_MAX = 50000 did: str token: bytes cipher: Cipher ip: Optional[str] offset: int subscribed: bool sub_ts: int supported_wildcard_sub: bool _manager: any _if_name: Optional[str] _sub_locked: bool _state: MIoTLanDeviceState _online: bool _online_offline_history: list[dict[str, any]] _online_offline_timer: Optional[TimeoutHandle] _ka_timer: TimeoutHandle _ka_internal: int def __init__( self, manager: any, did: str, token: str, ip: Optional[str] = None ) -> None: self._manager: MIoTLan = manager self.did = did self.token = bytes.fromhex(token) aes_key: bytes = self.__md5(self.token) aex_iv: bytes = self.__md5(aes_key + self.token) self.cipher = Cipher( algorithms.AES128(aes_key), modes.CBC(aex_iv), default_backend()) self.ip = ip self.offset = 0 self.subscribed = False self.sub_ts = 0 self.supported_wildcard_sub = False self._if_name = None self._sub_locked = False self._state = MIoTLanDeviceState.DEAD self._online = False self._online_offline_history = [] self._online_offline_timer = None def ka_init_handler(ctx: any) -> None: self._ka_internal = self.KA_INTERVAL_MIN self.__update_keep_alive(state=MIoTLanDeviceState.DEAD) self._ka_timer = self._manager.mev.set_timeout( randomize_int(self.CONSTRUCT_STATE_PENDING, 0.5), ka_init_handler, None) _LOGGER.debug('miot lan device add, %s', self.did) def keep_alive(self, ip: str, if_name: str) -> None: self.ip = ip if self._if_name != if_name: self._if_name = if_name _LOGGER.info( 'device if_name change, %s, %s', self._if_name, self.did) self.__update_keep_alive(state=MIoTLanDeviceState.FRESH) @property def online(self) -> bool: return self._online @online.setter def online(self, online: bool) -> None: if self._online == online: return self._online = online self._manager.broadcast_device_state( did=self.did, state={ 'online': self._online, 'push_available': self.subscribed}) @property def if_name(self) -> Optional[str]: return self._if_name def gen_packet( self, out_buffer: bytearray, clear_data: dict, did: str, offset: int ) -> int: clear_bytes = json.dumps(clear_data).encode('utf-8') padder = padding.PKCS7(algorithms.AES128.block_size).padder() padded_data = padder.update(clear_bytes) + padder.finalize() if len(padded_data) + self.OT_HEADER_LEN > len(out_buffer): raise ValueError('rpc too long') encryptor = self.cipher.encryptor() encrypted_data = encryptor.update(padded_data) + encryptor.finalize() data_len: int = len(encrypted_data)+self.OT_HEADER_LEN out_buffer[:32] = struct.pack( '>HHQI16s', self.OT_HEADER, data_len, int(did), offset, self.token) out_buffer[32:data_len] = encrypted_data msg_md5: bytes = self.__md5(out_buffer[0:data_len]) out_buffer[16:32] = msg_md5 return data_len def decrypt_packet(self, encrypted_data: bytearray) -> dict: data_len: int = struct.unpack('>H', encrypted_data[2:4])[0] md5_orig: bytes = encrypted_data[16:32] encrypted_data[16:32] = self.token md5_calc: bytes = self.__md5(encrypted_data[0:data_len]) if md5_orig != md5_calc: raise ValueError(f'invalid md5, {md5_orig}, {md5_calc}') decryptor = self.cipher.decryptor() decrypted_padded_data = decryptor.update( encrypted_data[32:data_len]) + decryptor.finalize() unpadder = padding.PKCS7(algorithms.AES128.block_size).unpadder() decrypted_data = unpadder.update( decrypted_padded_data) + unpadder.finalize() # Some device will add a redundant \0 at the end of JSON string decrypted_data = decrypted_data.rstrip(b'\x00') return json.loads(decrypted_data) def subscribe(self) -> None: if self._sub_locked: return self._sub_locked = True try: sub_ts: int = int(time.time()) self._manager.send2device( did=self.did, msg={ 'method': 'miIO.sub', 'params': { 'version': '2.0', 'did': self._manager.virtual_did, 'update_ts': sub_ts, 'sub_method': '.' } }, handler=self.__subscribe_handler, handler_ctx=sub_ts, timeout_ms=5000) except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('subscribe device error, %s', err) self._sub_locked = False def unsubscribe(self) -> None: if not self.subscribed: return self._manager.send2device( did=self.did, msg={ 'method': 'miIO.unsub', 'params': { 'version': '2.0', 'did': self._manager.virtual_did, 'update_ts': self.sub_ts or 0, 'sub_method': '.' } }, handler=self.__unsubscribe_handler, timeout_ms=5000) self.subscribed = False self._manager.broadcast_device_state( did=self.did, state={ 'online': self._online, 'push_available': self.subscribed}) def on_delete(self) -> None: if self._ka_timer: self._manager.mev.clear_timeout(self._ka_timer) if self._online_offline_timer: self._manager.mev.clear_timeout(self._online_offline_timer) self._manager = None self.cipher = None _LOGGER.debug('miot lan device delete, %s', self.did) def update_info(self, info: dict) -> None: if ( 'token' in info and len(info['token']) == 32 and info['token'].upper() != self.token.hex().upper() ): # Update token self.token = bytes.fromhex(info['token']) aes_key: bytes = self.__md5(self.token) aex_iv: bytes = self.__md5(aes_key + self.token) self.cipher = Cipher( algorithms.AES128(aes_key), modes.CBC(aex_iv), default_backend()) _LOGGER.debug('update token, %s', self.did) def __subscribe_handler(self, msg: dict, sub_ts: int) -> None: if ( 'result' not in msg or 'code' not in msg['result'] or msg['result']['code'] != 0 ): _LOGGER.error('subscribe device error, %s, %s', self.did, msg) return self.subscribed = True self.sub_ts = sub_ts self._manager.broadcast_device_state( did=self.did, state={ 'online': self._online, 'push_available': self.subscribed}) _LOGGER.info('subscribe success, %s, %s', self._if_name, self.did) def __unsubscribe_handler(self, msg: dict, ctx: any) -> None: if ( 'result' not in msg or 'code' not in msg['result'] or msg['result']['code'] != 0 ): _LOGGER.error('unsubscribe device error, %s, %s', self.did, msg) return _LOGGER.info('unsubscribe success, %s, %s', self._if_name, self.did) def __update_keep_alive(self, state: MIoTLanDeviceState) -> None: last_state: MIoTLanDeviceState = self._state self._state = state if self._state != MIoTLanDeviceState.FRESH: _LOGGER.debug('device status, %s, %s', self.did, self._state) if self._ka_timer: self._manager.mev.clear_timeout(self._ka_timer) self._ka_timer = None match state: case MIoTLanDeviceState.FRESH: if last_state == MIoTLanDeviceState.DEAD: self._ka_internal = self.KA_INTERVAL_MIN self.__change_online(True) self._ka_timer = self._manager.mev.set_timeout( self.__get_next_ka_timeout(), self.__update_keep_alive, MIoTLanDeviceState.PING1) case ( MIoTLanDeviceState.PING1 | MIoTLanDeviceState.PING2 | MIoTLanDeviceState.PING3 ): self._manager.ping(if_name=self._if_name, target_ip=self.ip) # Fast ping self._ka_timer = self._manager.mev.set_timeout( self.FAST_PING_INTERVAL, self.__update_keep_alive, MIoTLanDeviceState(state.value+1)) case MIoTLanDeviceState.DEAD: if last_state == MIoTLanDeviceState.PING3: self._ka_internal = self.KA_INTERVAL_MIN self.__change_online(False) case _: _LOGGER.error('invalid state, %s', state) def __get_next_ka_timeout(self) -> int: self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX) return randomize_int(self._ka_internal, 0.1) def __change_online(self, online: bool) -> None: _LOGGER.info('change online, %s, %s', self.did, online) ts_now: int = int(time.time()) self._online_offline_history.append({'ts': ts_now, 'online': online}) if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH: self._online_offline_history.pop(0) if self._online_offline_timer: self._manager.mev.clear_timeout(self._online_offline_timer) if not online: self.online = False else: if ( len(self._online_offline_history) < self.NETWORK_UNSTABLE_CNT_TH or ( ts_now - self._online_offline_history[0]['ts'] > self.NETWORK_UNSTABLE_TIME_TH) ): self.online = True else: _LOGGER.info('unstable device detected, %s', self.did) self._online_offline_timer = self._manager.mev.set_timeout( self.NETWORK_UNSTABLE_RESUME_TH, self.__online_resume_handler, None) def __online_resume_handler(self, ctx: any) -> None: _LOGGER.info('unstable resume threshold past, %s', self.did) self.online = True def __md5(self, data: bytes) -> bytes: hasher = hashes.Hash(hashes.MD5(), default_backend()) hasher.update(data) return hasher.finalize() class MIoTLan: """MIoT lan device control.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes OT_HEADER: bytes = b'\x21\x31' OT_PORT: int = 54321 OT_PROBE_LEN: int = 32 OT_MSG_LEN: int = 1400 OT_SUPPORT_WILDCARD_SUB: int = 0xFE OT_PROBE_INTERVAL_MIN: int = 5000 OT_PROBE_INTERVAL_MAX: int = 45000 _main_loop: asyncio.AbstractEventLoop _net_ifs: set[str] _network: MIoTNetwork _mips_service: MipsService _enable_subscribe: bool _lan_devices: dict[str, MIoTLanDevice] _virtual_did: str _probe_msg: bytes _write_buffer: bytearray _read_buffer: bytearray _mev: MIoTEventLoop _thread: threading.Thread _queue: queue.Queue _cmd_event_fdr: int _cmd_event_fdw: int _available_net_ifs: set[str] _broadcast_socks: dict[str, socket.socket] _local_port: Optional[int] _scan_timer: TimeoutHandle _last_scan_interval: Optional[int] _msg_id_counter: int _pending_requests: dict[int, MIoTLanRequestData] _device_msg_matcher: MIoTMatcher _device_state_sub_map: dict[str, MIoTLanSubDeviceState] _reply_msg_buffer: dict[str, TimeoutHandle] _lan_state_sub_map: dict[str, Callable[[bool], asyncio.Future]] _lan_ctrl_vote_map: dict[str, bool] _init_done: bool def __init__( self, net_ifs: list[str], network: MIoTNetwork, mips_service: MipsService, enable_subscribe: bool = False, virtual_did: Optional[int] = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: if not network: raise ValueError('network is required') if not mips_service: raise ValueError('mips_service is required') self._main_loop = loop or asyncio.get_event_loop() self._net_ifs = set(net_ifs) self._network = network self._network.sub_network_info( key='miot_lan', handler=self.__on_network_info_change) self._mips_service = mips_service self._mips_service.sub_service_change( key='miot_lan', group_id='*', handler=self.__on_mips_service_change) self._enable_subscribe = enable_subscribe self._virtual_did = virtual_did or str(secrets.randbits(64)) # Init socket probe message probe_bytes = bytearray(self.OT_PROBE_LEN) probe_bytes[:20] = ( b'!1\x00\x20\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFFMDID') probe_bytes[20:28] = struct.pack('>Q', int(self._virtual_did)) probe_bytes[28:32] = b'\x00\x00\x00\x00' self._probe_msg = bytes(probe_bytes) self._read_buffer = bytearray(self.OT_MSG_LEN) self._write_buffer = bytearray(self.OT_MSG_LEN) self._lan_devices = {} self._available_net_ifs = set() self._broadcast_socks = {} self._local_port = None self._scan_timer = None self._last_scan_interval = None self._msg_id_counter = int(random.random()*0x7FFFFFFF) self._pending_requests = {} self._device_msg_matcher = MIoTMatcher() self._device_state_sub_map = {} self._reply_msg_buffer = {} self._lan_state_sub_map = {} self._lan_ctrl_vote_map = {} self._init_done = False if ( len(self._mips_service.get_services()) == 0 and len(self._net_ifs) > 0 ): _LOGGER.info('no central hub gateway service, init miot lan') self._main_loop.call_later( 0, lambda: self._main_loop.create_task( self.init_async())) @ property def virtual_did(self) -> str: return self._virtual_did @ property def mev(self) -> MIoTEventLoop: return self._mev @property def init_done(self) -> bool: return self._init_done async def init_async(self) -> None: if self._init_done: _LOGGER.info('miot lan already init') return if len(self._net_ifs) == 0: _LOGGER.info('no net_ifs') return if not any(self._lan_ctrl_vote_map.values()): _LOGGER.info('no vote for lan ctrl') return if len(self._mips_service.get_services()) > 0: _LOGGER.info('central hub gateway service exist') return for if_name in list(self._network.network_info.keys()): self._available_net_ifs.add(if_name) if len(self._available_net_ifs) == 0: _LOGGER.info('no available net_ifs') return if self._net_ifs.isdisjoint(self._available_net_ifs): _LOGGER.info('no valid net_ifs') return self._mev = MIoTEventLoop() self._queue = queue.Queue() self._cmd_event_fdr, self._cmd_event_fdw = os.pipe() self._mev.set_read_handler( self._cmd_event_fdr, self.__cmd_read_handler, None) self._thread = threading.Thread(target=self.__lan_thread_handler) self._thread.name = 'miot_lan' self._thread.daemon = True self._thread.start() self._init_done = True for handler in list(self._lan_state_sub_map.values()): self._main_loop.create_task(handler(True)) _LOGGER.info( 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) async def deinit_async(self) -> None: if not self._init_done: _LOGGER.info('miot lan not init') return self._init_done = False self.__lan_send_cmd(MIoTLanCmdType.DEINIT, None) self._thread.join() self._lan_devices = {} self._broadcast_socks = {} self._local_port = None self._scan_timer = None self._last_scan_interval = None self._msg_id_counter = int(random.random()*0x7FFFFFFF) self._pending_requests = {} self._device_msg_matcher = MIoTMatcher() self._device_state_sub_map = {} self._reply_msg_buffer = {} for handler in list(self._lan_state_sub_map.values()): self._main_loop.create_task(handler(False)) _LOGGER.info('miot lan deinit') async def update_net_ifs_async(self, net_ifs: list[str]) -> None: _LOGGER.info('update net_ifs, %s', net_ifs) if not isinstance(net_ifs, list): _LOGGER.error('invalid net_ifs, %s', net_ifs) return if len(net_ifs) == 0: # Deinit lan await self.deinit_async() self._net_ifs = set(net_ifs) return available_net_ifs = set() for if_name in list(self._network.network_info.keys()): available_net_ifs.add(if_name) if set(net_ifs).isdisjoint(available_net_ifs): _LOGGER.error('no valid net_ifs, %s', net_ifs) await self.deinit_async() self._net_ifs = set(net_ifs) self._available_net_ifs = available_net_ifs return if not self._init_done: self._net_ifs = set(net_ifs) await self.init_async() return self.__lan_send_cmd( cmd=MIoTLanCmdType.NET_IFS_UPDATE, data=net_ifs) async def vote_for_lan_ctrl_async(self, key: str, vote: bool) -> None: _LOGGER.info('vote for lan ctrl, %s, %s', key, vote) self._lan_ctrl_vote_map[key] = vote if not any(self._lan_ctrl_vote_map.values()): await self.deinit_async() return await self.init_async() async def update_subscribe_option(self, enable_subscribe: bool) -> None: _LOGGER.info('update subscribe option, %s', enable_subscribe) if not self._init_done: self._enable_subscribe = enable_subscribe return return self.__lan_send_cmd( cmd=MIoTLanCmdType.OPTIONS_UPDATE, data={ 'enable_subscribe': enable_subscribe, }) def update_devices(self, devices: dict[str, dict]) -> bool: _LOGGER.info('update devices, %s', devices) return self.__lan_send_cmd( cmd=MIoTLanCmdType.DEVICE_UPDATE, data=devices) def delete_devices(self, devices: list[str]) -> bool: _LOGGER.info('delete devices, %s', devices) return self.__lan_send_cmd( cmd=MIoTLanCmdType.DEVICE_DELETE, data=devices) def sub_lan_state( self, key: str, handler: Callable[[bool], asyncio.Future] ) -> None: self._lan_state_sub_map[key] = handler def unsub_lan_state(self, key: str) -> None: self._lan_state_sub_map.pop(key, None) @final def sub_device_state( self, key: str, handler: Callable[[str, dict, any], None], handler_ctx: any = None ) -> bool: return self.__lan_send_cmd( cmd=MIoTLanCmdType.SUB_DEVICE_STATE, data=MIoTLanSubDeviceState( key=key, handler=handler, handler_ctx=handler_ctx)) @final def unsub_device_state(self, key: str) -> bool: return self.__lan_send_cmd( cmd=MIoTLanCmdType.UNSUB_DEVICE_STATE, data=MIoTLanUnsubDeviceState(key=key)) @final def sub_prop( self, did: str, handler: Callable[[dict, any], None], siid: int = None, piid: int = None, handler_ctx: any = None ) -> bool: if not self._enable_subscribe: return False key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') return self.__lan_send_cmd( cmd=MIoTLanCmdType.REG_BROADCAST, data=MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) @final def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: if not self._enable_subscribe: return False key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') return self.__lan_send_cmd( cmd=MIoTLanCmdType.UNREG_BROADCAST, data=MIoTLanUnregisterBroadcastData(key=key)) @final def sub_event( self, did: str, handler: Callable[[dict, any], None], siid: int = None, eiid: int = None, handler_ctx: any = None ) -> bool: if not self._enable_subscribe: return False key = ( f'{did}/e/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') return self.__lan_send_cmd( cmd=MIoTLanCmdType.REG_BROADCAST, data=MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) @final def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: if not self._enable_subscribe: return False key = ( f'{did}/e/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') return self.__lan_send_cmd( cmd=MIoTLanCmdType.UNREG_BROADCAST, data=MIoTLanUnregisterBroadcastData(key=key)) @final async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> any: result_obj = await self.__call_api_async( did=did, msg={ 'method': 'get_properties', 'params': [{'did': did, 'siid': siid, 'piid': piid}] }, timeout_ms=timeout_ms) if ( result_obj and 'result' in result_obj and len(result_obj['result']) == 1 and 'did' in result_obj['result'][0] and result_obj['result'][0]['did'] == did ): return result_obj['result'][0].get('value', None) return None @final async def set_prop_async( self, did: str, siid: int, piid: int, value: any, timeout_ms: int = 10000 ) -> dict: result_obj = await self.__call_api_async( did=did, msg={ 'method': 'set_properties', 'params': [{ 'did': did, 'siid': siid, 'piid': piid, 'value': value}] }, timeout_ms=timeout_ms) if result_obj: if ( 'result' in result_obj and len(result_obj['result']) == 1 and 'did' in result_obj['result'][0] and result_obj['result'][0]['did'] == did and 'code' in result_obj['result'][0] ): return result_obj['result'][0] if 'code' in result_obj: return result_obj return { 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'message': 'Invalid result'} @final async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> dict: result_obj = await self.__call_api_async( did=did, msg={ 'method': 'action', 'params': { 'did': did, 'siid': siid, 'aiid': aiid, 'in': in_list} }, timeout_ms=timeout_ms) if result_obj: if 'result' in result_obj and 'code' in result_obj['result']: return result_obj['result'] if 'code' in result_obj: return result_obj return { 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'message': 'Invalid result'} @final async def get_dev_list_async( self, timeout_ms: int = 10000 ) -> dict[str, dict]: if not self._init_done: return {} def get_device_list_handler(msg: dict, fut: asyncio.Future): self._main_loop.call_soon_threadsafe( fut.set_result, msg) fut: asyncio.Future = self._main_loop.create_future() if self.__lan_send_cmd( MIoTLanCmdType.GET_DEV_LIST, MIoTLanGetDevListData( handler=get_device_list_handler, handler_ctx=fut, timeout_ms=timeout_ms)): return await fut _LOGGER.error('get_dev_list_async error, send cmd failed') fut.set_result({}) return await fut def ping(self, if_name: str, target_ip: str) -> None: if not target_ip: return self.__sendto( if_name=if_name, data=self._probe_msg, address=target_ip, port=self.OT_PORT) def send2device( self, did: str, msg: dict, handler: Optional[Callable[[dict, any], None]] = None, handler_ctx: any = None, timeout_ms: Optional[int] = None ) -> None: if timeout_ms and not handler: raise ValueError('handler is required when timeout_ms is set') device: MIoTLanDevice = self._lan_devices.get(did) if not device: raise ValueError('invalid device') if not device.cipher: raise ValueError('invalid device cipher') if not device.if_name: raise ValueError('invalid device if_name') if not device.ip: raise ValueError('invalid device ip') in_msg = {'id': self.__gen_msg_id(), **msg} msg_len = device.gen_packet( out_buffer=self._write_buffer, clear_data=in_msg, did=did, offset=int(time.time())-device.offset) return self.make_request( msg_id=in_msg['id'], msg=self._write_buffer[0: msg_len], if_name=device.if_name, ip=device.ip, handler=handler, handler_ctx=handler_ctx, timeout_ms=timeout_ms) def make_request( self, msg_id: int, msg: bytearray, if_name: str, ip: str, handler: Callable[[dict, any], None], handler_ctx: any = None, timeout_ms: Optional[int] = None ) -> None: def request_timeout_handler(req_data: MIoTLanRequestData): self._pending_requests.pop(req_data.msg_id, None) if req_data: req_data.handler({ 'code': MIoTErrorCode.CODE_TIMEOUT.value, 'error': 'timeout'}, req_data.handler_ctx) timer: Optional[TimeoutHandle] = None request_data = MIoTLanRequestData( msg_id=msg_id, handler=handler, handler_ctx=handler_ctx, timeout=timer) if timeout_ms: timer = self._mev.set_timeout( timeout_ms, request_timeout_handler, request_data) request_data.timeout = timer self._pending_requests[msg_id] = request_data self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT) def broadcast_device_state(self, did: str, state: dict) -> None: for handler in self._device_state_sub_map.values(): self._main_loop.call_soon_threadsafe( self._main_loop.create_task, handler.handler(did, state, handler.handler_ctx)) def __gen_msg_id(self) -> int: if not self._msg_id_counter: self._msg_id_counter = int(random.random()*0x7FFFFFFF) self._msg_id_counter += 1 if self._msg_id_counter > 0x80000000: self._msg_id_counter = 1 return self._msg_id_counter def __lan_send_cmd(self, cmd: MIoTLanCmd, data: any) -> bool: try: self._queue.put(MIoTLanCmd(type_=cmd, data=data)) os.write(self._cmd_event_fdw, '1'.encode()) return True except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('send cmd error, %s, %s', cmd, err) return False async def __call_api_async( self, did: str, msg: dict, timeout_ms: int = 10000 ) -> dict: def call_api_handler(msg: dict, fut: asyncio.Future): self._main_loop.call_soon_threadsafe( fut.set_result, msg) fut: asyncio.Future = self._main_loop.create_future() if self.__lan_send_cmd( cmd=MIoTLanCmdType.CALL_API, data=MIoTLanCallApiData( did=did, msg=msg, handler=call_api_handler, handler_ctx=fut, timeout_ms=timeout_ms)): return await fut fut.set_result({ 'code': MIoTErrorCode.CODE_UNAVAILABLE.value, 'error': 'send cmd error'}) return await fut def __lan_thread_handler(self) -> None: _LOGGER.info('miot lan thread start') self.__init_socket() # Create scan devices timer self._scan_timer = self._mev.set_timeout( int(3000*random.random()), self.__scan_devices, None) self._mev.loop_forever() _LOGGER.info('miot lan thread exit') def __cmd_read_handler(self, ctx: any) -> None: fd_value = os.read(self._cmd_event_fdr,1) if fd_value == 0: return while not self._queue.empty(): mips_cmd: MIoTLanCmd = self._queue.get(block=False) if mips_cmd.type_ == MIoTLanCmdType.CALL_API: call_api_data: MIoTLanCallApiData = mips_cmd.data try: self.send2device( did=call_api_data.did, msg={'from': 'ha.xiaomi_home', **call_api_data.msg}, handler=call_api_data.handler, handler_ctx=call_api_data.handler_ctx, timeout_ms=call_api_data.timeout_ms) except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('send2device error, %s', err) call_api_data.handler({ 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'error': str(err)}, call_api_data.handler_ctx) elif mips_cmd.type_ == MIoTLanCmdType.SUB_DEVICE_STATE: sub_data: MIoTLanSubDeviceState = mips_cmd.data self._device_state_sub_map[sub_data.key] = sub_data elif mips_cmd.type_ == MIoTLanCmdType.UNSUB_DEVICE_STATE: sub_data: MIoTLanUnsubDeviceState = mips_cmd.data self._device_state_sub_map.pop(sub_data.key, None) elif mips_cmd.type_ == MIoTLanCmdType.REG_BROADCAST: reg_data: MIoTLanRegisterBroadcastData = mips_cmd.data self._device_msg_matcher[reg_data.key] = reg_data _LOGGER.debug('lan register broadcast, %s', reg_data.key) elif mips_cmd.type_ == MIoTLanCmdType.UNREG_BROADCAST: unreg_data: MIoTLanUnregisterBroadcastData = mips_cmd.data if self._device_msg_matcher.get(topic=unreg_data.key): del self._device_msg_matcher[unreg_data.key] _LOGGER.debug('lan unregister broadcast, %s', unreg_data.key) elif mips_cmd.type_ == MIoTLanCmdType.GET_DEV_LIST: get_dev_list_data: MIoTLanGetDevListData = mips_cmd.data dev_list = { device.did: { 'online': device.online, 'push_available': device.subscribed } for device in self._lan_devices.values() if device.online} get_dev_list_data.handler( dev_list, get_dev_list_data.handler_ctx) elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_UPDATE: devices: dict[str, dict] = mips_cmd.data for did, info in devices.items(): if did not in self._lan_devices: if 'token' not in info: _LOGGER.error( 'token not found, %s, %s', did, info) continue if len(info['token']) != 32: _LOGGER.error( 'invalid device token, %s, %s', did, info) continue self._lan_devices[did] = MIoTLanDevice( manager=self, did=did, token=info['token'], ip=info.get('ip', None)) else: self._lan_devices[did].update_info(info) elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_DELETE: device_dids: list[str] = mips_cmd.data for did in device_dids: lan_device = self._lan_devices.pop(did, None) if not lan_device: continue lan_device.on_delete() elif mips_cmd.type_ == MIoTLanCmdType.NET_INFO_UPDATE: net_data: MIoTLanNetworkUpdateData = mips_cmd.data if net_data.status == InterfaceStatus.ADD: self._available_net_ifs.add(net_data.if_name) if net_data.if_name in self._net_ifs: self.__create_socket(if_name=net_data.if_name) elif net_data.status == InterfaceStatus.REMOVE: self._available_net_ifs.remove(net_data.if_name) self.__destroy_socket(if_name=net_data.if_name) elif mips_cmd.type_ == MIoTLanCmdType.NET_IFS_UPDATE: net_ifs: list[str] = mips_cmd.data if self._net_ifs != set(net_ifs): self._net_ifs = set(net_ifs) for if_name in self._net_ifs: self.__create_socket(if_name=if_name) for if_name in list(self._broadcast_socks.keys()): if if_name not in self._net_ifs: self.__destroy_socket(if_name=if_name) elif mips_cmd.type_ == MIoTLanCmdType.OPTIONS_UPDATE: options: dict = mips_cmd.data if 'enable_subscribe' in options: if options['enable_subscribe'] != self._enable_subscribe: self._enable_subscribe = options['enable_subscribe'] if not self._enable_subscribe: # Unsubscribe all for device in self._lan_devices.values(): device.unsubscribe() elif mips_cmd.type_ == MIoTLanCmdType.DEINIT: # stop the thread if self._scan_timer: self._mev.clear_timeout(self._scan_timer) self._scan_timer = None for device in self._lan_devices.values(): device.on_delete() self._lan_devices.clear() for req_data in self._pending_requests.values(): self._mev.clear_timeout(req_data.timeout) self._pending_requests.clear() for timer in self._reply_msg_buffer.values(): self._mev.clear_timeout(timer) self._reply_msg_buffer.clear() self._device_msg_matcher = MIoTMatcher() self.__deinit_socket() self._mev.loop_stop() def __init_socket(self) -> None: self.__deinit_socket() for if_name in self._net_ifs: if if_name not in self._available_net_ifs: return self.__create_socket(if_name=if_name) def __create_socket(self, if_name: str) -> None: if if_name in self._broadcast_socks: _LOGGER.info('socket already created, %s', if_name) return # Create socket try: sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Set SO_BINDTODEVICE sock.setsockopt( socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode()) sock.bind(('', self._local_port or 0)) self._mev.set_read_handler( sock.fileno(), self.__socket_read_handler, (if_name, sock)) self._broadcast_socks[if_name] = sock self._local_port = self._local_port or sock.getsockname()[1] _LOGGER.info( 'created socket, %s, %s', if_name, self._local_port) except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('create socket error, %s, %s', if_name, err) def __deinit_socket(self) -> None: for if_name in list(self._broadcast_socks.keys()): self.__destroy_socket(if_name) self._broadcast_socks.clear() def __destroy_socket(self, if_name: str) -> None: sock = self._broadcast_socks.pop(if_name, None) if not sock: return self._mev.set_read_handler(sock.fileno(), None, None) sock.close() _LOGGER.info('destroyed socket, %s', if_name) def __socket_read_handler(self, ctx: tuple[str, socket.socket]) -> None: try: data_len, addr = ctx[1].recvfrom_into( self._read_buffer, self.OT_MSG_LEN, socket.MSG_DONTWAIT) if data_len < 0: # Socket error _LOGGER.error('socket read error, %s, %s', ctx[0], data_len) return if addr[1] != self.OT_PORT: # Not ot msg return self.__raw_message_handler( self._read_buffer[:data_len], data_len, addr[0], ctx[0]) except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('socket read handler error, %s', err) def __raw_message_handler( self, data: bytearray, data_len: int, ip: str, if_name: str ) -> None: if data[:2] != self.OT_HEADER: return # Keep alive message did: str = str(struct.unpack('>Q', data[4:12])[0]) device: MIoTLanDevice = self._lan_devices.get(did) if not device: return timestamp: int = struct.unpack('>I', data[12:16])[0] device.offset = int(time.time()) - timestamp # Keep alive if this is a probe if data_len == self.OT_PROBE_LEN or device.subscribed: device.keep_alive(ip=ip, if_name=if_name) # Manage device subscribe status if ( self._enable_subscribe and data_len == self.OT_PROBE_LEN and data[16:20] == b'MSUB' and data[24:27] == b'PUB' ): device.supported_wildcard_sub = ( int(data[28]) == self.OT_SUPPORT_WILDCARD_SUB) sub_ts = struct.unpack('>I', data[20:24])[0] sub_type = int(data[27]) if ( device.supported_wildcard_sub and sub_type in [0, 1, 4] and sub_ts != device.sub_ts ): device.subscribed = False device.subscribe() if data_len > self.OT_PROBE_LEN: # handle device message try: decrypted_data = device.decrypt_packet(data) self.__message_handler(did, decrypted_data) except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('decrypt packet error, %s, %s', did, err) return def __message_handler(self, did: str, msg: dict) -> None: if 'id' not in msg: _LOGGER.warning('invalid message, no id, %s, %s', did, msg) return # Reply req: MIoTLanRequestData = self._pending_requests.pop(msg['id'], None) if req: self._mev.clear_timeout(req.timeout) self._main_loop.call_soon_threadsafe( req.handler, msg, req.handler_ctx) return # Handle up link message if 'method' not in msg or 'params' not in msg: _LOGGER.debug( 'invalid message, no method or params, %s, %s', did, msg) return # Filter dup message if self.__filter_dup_message(did, msg['id']): self.send2device( did=did, msg={'id': msg['id'], 'result': {'code': 0}}) return _LOGGER.debug('lan message, %s, %s', did, msg) if msg['method'] == 'properties_changed': for param in msg['params']: if 'siid' not in param and 'piid' not in param: _LOGGER.debug( 'invalid message, no siid or piid, %s, %s', did, msg) continue key = f'{did}/p/{param["siid"]}/{param["piid"]}' subs: list[MIoTLanRegisterBroadcastData] = list( self._device_msg_matcher.iter_match(key)) for sub in subs: self._main_loop.call_soon_threadsafe( sub.handler, param, sub.handler_ctx) elif ( msg['method'] == 'event_occured' and 'siid' in msg['params'] and 'eiid' in msg['params'] ): key = f'{did}/e/{msg["params"]["siid"]}/{msg["params"]["eiid"]}' subs: list[MIoTLanRegisterBroadcastData] = list( self._device_msg_matcher.iter_match(key)) for sub in subs: self._main_loop.call_soon_threadsafe( sub.handler, msg['params'], sub.handler_ctx) else: _LOGGER.debug( 'invalid message, unknown method, %s, %s', did, msg) # Reply self.send2device( did=did, msg={'id': msg['id'], 'result': {'code': 0}}) def __filter_dup_message(self, did: str, msg_id: int) -> bool: filter_id = f'{did}.{msg_id}' if filter_id in self._reply_msg_buffer: return True self._reply_msg_buffer[filter_id] = self._mev.set_timeout( 5000, lambda filter_id: self._reply_msg_buffer.pop(filter_id, None), filter_id) def __sendto( self, if_name: str, data: bytes, address: str, port: int ) -> None: if address == '255.255.255.255': # Broadcast for if_n, sock in self._broadcast_socks.items(): _LOGGER.debug('send broadcast, %s', if_n) sock.sendto(data, socket.MSG_DONTWAIT, (address, port)) else: # Unicast sock = self._broadcast_socks.get(if_name, None) if not sock: _LOGGER.error('invalid socket, %s', if_name) return sock.sendto(data, socket.MSG_DONTWAIT, (address, port)) def __scan_devices(self, ctx: any) -> None: if self._scan_timer: self._mev.clear_timeout(self._scan_timer) # Scan devices self.ping(if_name=None, target_ip='255.255.255.255') scan_time = self.__get_next_scan_time() self._scan_timer = self._mev.set_timeout( scan_time, self.__scan_devices, None) _LOGGER.debug('next scan time: %sms', scan_time) def __get_next_scan_time(self) -> int: if not self._last_scan_interval: self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN self._last_scan_interval = min( self._last_scan_interval*2, self.OT_PROBE_INTERVAL_MAX) return self._last_scan_interval async def __on_network_info_change( self, status: InterfaceStatus, info: NetworkInfo ) -> None: _LOGGER.info( 'on network info change, status: %s, info: %s', status, info) available_net_ifs = set() for if_name in list(self._network.network_info.keys()): available_net_ifs.add(if_name) if len(available_net_ifs) == 0: await self.deinit_async() self._available_net_ifs = available_net_ifs return if self._net_ifs.isdisjoint(available_net_ifs): _LOGGER.info('no valid net_ifs') await self.deinit_async() self._available_net_ifs = available_net_ifs return if not self._init_done: self._available_net_ifs = available_net_ifs await self.init_async() return self.__lan_send_cmd( MIoTLanCmdType.NET_INFO_UPDATE, MIoTLanNetworkUpdateData( status=status, if_name=info.name)) async def __on_mips_service_change( self, group_id: str, state: MipsServiceState, data: dict ) -> None: _LOGGER.info( 'on mips service change, %s, %s, %s', group_id, state, data) if len(self._mips_service.get_services()) > 0: _LOGGER.info('find central service, deinit miot lan') await self.deinit_async() else: _LOGGER.info('no central service, init miot lan') await self.init_async()