# -*- coding: utf-8 -*- """ Copyright (C) 2024 Xiaomi Corporation. The ownership and intellectual property rights of Xiaomi Home Assistant Integration and related Xiaomi cloud service API interface provided under this license, including source code and object code (collectively, "Licensed Work"), are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi hereby grants you a personal, limited, non-exclusive, non-transferable, non-sublicensable, and royalty-free license to reproduce, use, modify, and distribute the Licensed Work only for your use of Home Assistant for non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize you to use the Licensed Work for any other purpose, including but not limited to use Licensed Work to develop applications (APP), Web services, and other forms of software. You may reproduce and distribute copies of the Licensed Work, with or without modifications, whether in source or object form, provided that you must give any other recipients of the Licensed Work a copy of this License and retain all copyright and disclaimers. Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible for any direct, indirect, special, incidental, or consequential damages or losses arising from the use or inability to use the Licensed Work. Xiaomi reserves all rights not expressly granted to you in this License. Except for the rights expressly granted by Xiaomi under this License, Xiaomi does not authorize you in any form to use the trademarks, copyrights, or other forms of intellectual property rights of Xiaomi and its affiliates, including, without limitation, without obtaining other written permission from Xiaomi, you shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that may make the public associate with Xiaomi in any form to publicize or promote the software or hardware devices that use the Licensed Work. Xiaomi has the right to immediately terminate all your authorization under this License in the event: 1. You assert patent invalidation, litigation, or other claims against patents or other intellectual property rights of Xiaomi or its affiliates; or, 2. You make, have made, manufacture, sell, or offer to sell products that knock off Xiaomi or its affiliates' products. MIoT Pub/Sub client. """ import asyncio import json import logging import os import queue import random import re import ssl import struct import threading from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, auto from typing import Callable, Optional, final from paho.mqtt.client import ( MQTT_ERR_SUCCESS, MQTT_ERR_UNKNOWN, Client, MQTTv5) # pylint: disable=relative-beyond-top-level from .common import MIoTMatcher from .const import MIHOME_MQTT_KEEPALIVE from .miot_error import MIoTErrorCode, MIoTMipsError from .miot_ev import MIoTEventLoop, TimeoutHandle _LOGGER = logging.getLogger(__name__) class MipsMsgTypeOptions(Enum): """MIoT Pub/Sub message type.""" ID = 0 RET_TOPIC = auto() PAYLOAD = auto() FROM = auto() MAX = auto() class MipsMessage: """MIoT Pub/Sub message.""" mid: int = 0 msg_from: str = None ret_topic: str = None payload: str = None @staticmethod def unpack(data: bytes): mips_msg = MipsMessage() data_len = len(data) data_start = 0 data_end = 0 while data_start < data_len: data_end = data_start+5 unpack_len, unpack_type = struct.unpack( ' bytes: if mid is None or payload is None: raise MIoTMipsError('invalid mid or payload') pack_msg: bytes = b'' # mid pack_msg += struct.pack(' str: return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}' class MipsCmdType(Enum): """MIoT Pub/Sub command type.""" CONNECT = 0 DISCONNECT = auto() DEINIT = auto() SUB = auto() UNSUB = auto() CALL_API = auto() REG_BROADCAST = auto() UNREG_BROADCAST = auto() REG_MIPS_STATE = auto() UNREG_MIPS_STATE = auto() REG_DEVICE_STATE = auto() UNREG_DEVICE_STATE = auto() @dataclass class MipsCmd: """MIoT Pub/Sub command.""" type_: MipsCmdType data: any def __init__(self, type_: MipsCmdType, data: any) -> None: self.type_ = type_ self.data = data @dataclass class MipsRequest: """MIoT Pub/Sub request.""" mid: int = None on_reply: Callable[[str, any], None] = None on_reply_ctx: any = None timer: TimeoutHandle = None @dataclass class MipsRequestData: """MIoT Pub/Sub request data.""" topic: str = None payload: str = None on_reply: Callable[[str, any], None] = None on_reply_ctx: any = None timeout_ms: int = None @dataclass class MipsSendBroadcastData: """MIoT Pub/Sub send broadcast data.""" topic: str = None payload: str = None @dataclass class MipsIncomingApiCall: """MIoT Pub/Sub incoming API call.""" mid: int = None ret_topic: str = None timer: TimeoutHandle = None @dataclass class MipsApi: """MIoT Pub/Sub API.""" topic: str = None """ param1: session param2: payload param3: handler_ctx """ handler: Callable[[MipsIncomingApiCall, str, any], None] = None handler_ctx: any = None class MipsRegApi(MipsApi): """.MIoT Pub/Sub register API.""" @dataclass class MipsReplyData: """MIoT Pub/Sub reply data.""" session: MipsIncomingApiCall = None payload: str = None @dataclass class MipsBroadcast: """MIoT Pub/Sub broadcast.""" topic: str = None """ param 1: msg topic param 2: msg payload param 3: handle_ctx """ handler: Callable[[str, str, any], None] = None handler_ctx: any = None def __str__(self) -> str: return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}' class MipsRegBroadcast(MipsBroadcast): """MIoT Pub/Sub register broadcast.""" @dataclass class MipsState: """MIoT Pub/Sub state.""" key: str = None """ str: key bool: mips connect state any: ctx """ handler: Callable[[str, bool], asyncio.Future] = None class MipsRegState(MipsState): """MIoT Pub/Sub register state.""" class MIoTDeviceState(Enum): """MIoT device state define.""" DISABLE = 0 OFFLINE = auto() ONLINE = auto() @dataclass class MipsDeviceState: """MIoT Pub/Sub device state.""" did: str = None """handler str: did MIoTDeviceState: online/offline/disable any: ctx """ handler: Callable[[str, MIoTDeviceState, any], None] = None handler_ctx: any = None class MipsRegDeviceState(MipsDeviceState): """MIoT Pub/Sub register device state.""" class MipsClient(ABC): """MIoT Pub/Sub client.""" # pylint: disable=unused-argument MQTT_INTERVAL_MS = 1000 MIPS_QOS: int = 2 UINT32_MAX: int = 0xFFFFFFFF MIPS_RECONNECT_INTERVAL_MIN: int = 30000 MIPS_RECONNECT_INTERVAL_MAX: int = 600000 MIPS_SUB_PATCH: int = 300 MIPS_SUB_INTERVAL: int = 1000 main_loop: asyncio.AbstractEventLoop _logger: logging.Logger _client_id: str _host: str _port: int _username: str _password: str _ca_file: str _cert_file: str _key_file: str _mqtt_logger: logging.Logger _mqtt: Client _mqtt_fd: int _mqtt_timer: TimeoutHandle _mqtt_state: bool _event_connect: asyncio.Event _event_disconnect: asyncio.Event _mev: MIoTEventLoop _mips_thread: threading.Thread _mips_queue: queue.Queue _cmd_event_fdr: int _cmd_event_fdw: int _mips_reconnect_tag: bool _mips_reconnect_interval: int _mips_reconnect_timer: Optional[TimeoutHandle] _mips_state_sub_map: dict[str, MipsState] _mips_sub_pending_map: dict[str, int] _mips_sub_pending_timer: Optional[TimeoutHandle] _on_mips_cmd: Callable[[MipsCmd], None] _on_mips_message: Callable[[str, bytes], None] _on_mips_connect: Callable[[int, dict], None] _on_mips_disconnect: Callable[[int, dict], None] def __init__( self, client_id: str, host: str, port: int, username: str = None, password: str = None, ca_file: str = None, cert_file: str = None, key_file: str = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: # MUST run with running loop self.main_loop = loop or asyncio.get_running_loop() self._logger = None self._client_id = client_id self._host = host self._port = port self._username = username self._password = password self._ca_file = ca_file self._cert_file = cert_file self._key_file = key_file self._mqtt_logger = None self._mqtt_fd = -1 self._mqtt_timer = None self._mqtt_state = False # mqtt init for API_VERSION2, # callback_api_version=CallbackAPIVersion.VERSION2, self._mqtt = Client(client_id=self._client_id, protocol=MQTTv5) self._mqtt.enable_logger(logger=self._mqtt_logger) # Mips init self._event_connect = asyncio.Event() self._event_disconnect = asyncio.Event() self._mips_reconnect_tag = False self._mips_reconnect_interval = 0 self._mips_reconnect_timer = None self._mips_state_sub_map = {} self._mips_sub_pending_map = {} self._mips_sub_pending_timer = None self._mev = MIoTEventLoop() self._mips_queue = queue.Queue() self._cmd_event_fdr, self._cmd_event_fdw = os.pipe() self.mev_set_read_handler( self._cmd_event_fdr, self.__mips_cmd_read_handler, None) self._mips_thread = threading.Thread(target=self.__mips_loop_thread) self._mips_thread.daemon = True self._mips_thread.name = self._client_id self._mips_thread.start() self._on_mips_cmd = None self._on_mips_message = None self._on_mips_connect = None self._on_mips_disconnect = None @property def client_id(self) -> str: return self._client_id @property def host(self) -> str: return self._host @property def port(self) -> int: return self._port @final @property def mips_state(self) -> bool: """mips connect state. Returns: bool: True: connected, False: disconnected """ return self._mqtt and self._mqtt.is_connected() @final def mips_deinit(self) -> None: self._mips_send_cmd(type_=MipsCmdType.DEINIT, data=None) self._mips_thread.join() self._mips_thread = None self._logger = None self._client_id = None self._host = None self._port = None self._username = None self._password = None self._ca_file = None self._cert_file = None self._key_file = None self._mqtt_logger = None self._mips_state_sub_map = None self._mips_sub_pending_map = None self._mips_sub_pending_timer = None self._event_connect = None self._event_disconnect = None def update_mqtt_password(self, password: str) -> None: self._password = password self._mqtt.username_pw_set( username=self._username, password=self._password) def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._client_id}, '+msg, *args, **kwargs) def log_info(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.info(f'{self._client_id}, '+msg, *args, **kwargs) def log_error(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.error(f'{self._client_id}, '+msg, *args, **kwargs) def enable_logger(self, logger: Optional[logging.Logger] = None) -> None: self._logger = logger def enable_mqtt_logger( self, logger: Optional[logging.Logger] = None ) -> None: if logger: self._mqtt.enable_logger(logger=logger) else: self._mqtt.disable_logger() @final def mips_connect(self) -> None: """mips connect.""" return self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) @final async def mips_connect_async(self) -> None: """mips connect async.""" self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) return await self._event_connect.wait() @final def mips_disconnect(self) -> None: """mips disconnect.""" return self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) @final async def mips_disconnect_async(self) -> None: """mips disconnect async.""" self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) return await self._event_disconnect.wait() @final def sub_mips_state( self, key: str, handler: Callable[[str, bool], asyncio.Future] ) -> bool: """Subscribe mips state. NOTICE: callback to main loop thread """ if isinstance(key, str) is False or handler is None: raise MIoTMipsError('invalid params') return self._mips_send_cmd( type_=MipsCmdType.REG_MIPS_STATE, data=MipsRegState(key=key, handler=handler)) @final def unsub_mips_state(self, key: str) -> bool: """Unsubscribe mips state.""" if isinstance(key, str) is False: raise MIoTMipsError('invalid params') return self._mips_send_cmd( type_=MipsCmdType.UNREG_MIPS_STATE, data=MipsRegState(key=key)) @final def mev_set_timeout( self, timeout_ms: int, handler: Callable[[any], None], handler_ctx: any = None ) -> Optional[TimeoutHandle]: """set timeout. NOTICE: Internal function, only mips threads are allowed to call """ if self._mev is None: return None return self._mev.set_timeout( timeout_ms=timeout_ms, handler=handler, handler_ctx=handler_ctx) @final def mev_clear_timeout(self, handle: TimeoutHandle) -> None: """clear timeout. NOTICE: Internal function, only mips threads are allowed to call """ if self._mev is None: return self._mev.clear_timeout(handle) @final def mev_set_read_handler( self, fd: int, handler: Callable[[any], None], handler_ctx: any ) -> bool: """set read handler. NOTICE: Internal function, only mips threads are allowed to call """ if self._mev is None: return False return self._mev.set_read_handler( fd=fd, handler=handler, handler_ctx=handler_ctx) @final def mev_set_write_handler( self, fd: int, handler: Callable[[any], None], handler_ctx: any ) -> bool: """set write handler. NOTICE: Internal function, only mips threads are allowed to call """ if self._mev is None: return False return self._mev.set_write_handler( fd=fd, handler=handler, handler_ctx=handler_ctx) @property def on_mips_cmd(self) -> Callable[[MipsCmd], None]: return self._on_mips_cmd @on_mips_cmd.setter def on_mips_cmd(self, handler: Callable[[MipsCmd], None]) -> None: """MUST set after __init__ done. NOTICE thread safe, this function will be called at the **mips** thread """ self._on_mips_cmd = handler @property def on_mips_message(self) -> Callable[[str, bytes], None]: return self._on_mips_message @on_mips_message.setter def on_mips_message(self, handler: Callable[[str, bytes], None]) -> None: """MUST set after __init__ done. NOTICE thread safe, this function will be called at the **mips** thread """ self._on_mips_message = handler @property def on_mips_connect(self) -> Callable[[int, dict], None]: return self._on_mips_connect @on_mips_connect.setter def on_mips_connect(self, handler: Callable[[int, dict], None]) -> None: """MUST set after __init__ done. NOTICE thread safe, this function will be called at the **main loop** thread """ self._on_mips_connect = handler @property def on_mips_disconnect(self) -> Callable[[int, dict], None]: return self._on_mips_disconnect @on_mips_disconnect.setter def on_mips_disconnect(self, handler: Callable[[int, dict], None]) -> None: """MUST set after __init__ done. NOTICE thread safe, this function will be called at the **main loop** thread """ self._on_mips_disconnect = handler @abstractmethod def sub_prop( self, did: str, handler: Callable[[dict, any], None], siid: int = None, piid: int = None, handler_ctx: any = None ) -> bool: ... @abstractmethod def unsub_prop( self, did: str, siid: int = None, piid: int = None ) -> bool: ... @abstractmethod def sub_event( self, did: str, handler: Callable[[dict, any], None], siid: int = None, eiid: int = None, handler_ctx: any = None ) -> bool: ... @abstractmethod def unsub_event( self, did: str, siid: int = None, eiid: int = None ) -> bool: ... @abstractmethod async def get_dev_list_async( self, payload: str = None, timeout_ms: int = 10000 ) -> dict[str, dict]: ... @abstractmethod async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> any: ... @abstractmethod async def set_prop_async( self, did: str, siid: int, piid: int, value: any, timeout_ms: int = 10000 ) -> bool: ... @abstractmethod async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> tuple[bool, list]: ... @final def _mips_sub_internal(self, topic: str) -> None: """mips subscribe. NOTICE: Internal function, only mips threads are allowed to call """ self.__thread_check() if not self._mqtt or not self._mqtt.is_connected(): return try: if topic not in self._mips_sub_pending_map: self._mips_sub_pending_map[topic] = 0 if not self._mips_sub_pending_timer: self._mips_sub_pending_timer = self.mev_set_timeout( 10, self.__mips_sub_internal_pending_handler, topic) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips sub internal error, {topic}. {err}') @final def _mips_unsub_internal(self, topic: str) -> None: """mips unsubscribe. NOTICE: Internal function, only mips threads are allowed to call """ self.__thread_check() if not self._mqtt or not self._mqtt.is_connected(): return try: result, mid = self._mqtt.unsubscribe(topic=topic) if result == MQTT_ERR_SUCCESS: self.log_debug( f'mips unsub internal success, {result}, {mid}, {topic}') return self.log_error( f'mips unsub internal error, {result}, {mid}, {topic}') except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips unsub internal error, {topic}, {err}') @final def _mips_publish_internal( self, topic: str, payload: str | bytes, wait_for_publish: bool = False, timeout_ms: int = 10000 ) -> bool: """mips publish message. NOTICE: Internal function, only mips threads are allowed to call """ self.__thread_check() if not self._mqtt or not self._mqtt.is_connected(): return False try: handle = self._mqtt.publish( topic=topic, payload=payload, qos=self.MIPS_QOS) # self.log_debug(f'_mips_publish_internal, {topic}, {payload}') if wait_for_publish is True: handle.wait_for_publish(timeout_ms/1000.0) return True except Exception as err: # pylint: disable=broad-exception-caught # Catch other exception self.log_error(f'mips publish internal error, {err}') return False @final def _mips_send_cmd(self, type_: MipsCmdType, data: any) -> bool: if self._mips_queue is None or self._cmd_event_fdw is None: raise MIoTMipsError('send mips cmd disable') # Put data to queue self._mips_queue.put(MipsCmd(type_=type_, data=data)) # Write event fd os.write(self._cmd_event_fdw, '1'.encode()) # self.log_debug(f'send mips cmd, {type}, {data}') return True def __thread_check(self) -> None: if threading.current_thread() is not self._mips_thread: raise MIoTMipsError('illegal call') def __mips_cmd_read_handler(self, ctx: any) -> None: fd_value = os.read(self._cmd_event_fdr,1) if fd_value == 0: return while self._mips_queue.empty() is False: mips_cmd: MipsCmd = self._mips_queue.get(block=False) if mips_cmd.type_ == MipsCmdType.CONNECT: self._mips_reconnect_tag = True self.__mips_try_reconnect(immediately=True) elif mips_cmd.type_ == MipsCmdType.DISCONNECT: self._mips_reconnect_tag = False self.__mips_disconnect() elif mips_cmd.type_ == MipsCmdType.DEINIT: self.log_info('mips client recv deinit cmd') self.__mips_disconnect() # Close cmd event fd if self._cmd_event_fdr: self.mev_set_read_handler( self._cmd_event_fdr, None, None) os.close(self._cmd_event_fdr) self._cmd_event_fdr = None if self._mips_queue: self._mips_queue = None # ev loop stop if self._mev: self._mev.loop_stop() self._mev = None break elif mips_cmd.type_ == MipsCmdType.REG_MIPS_STATE: state: MipsState = mips_cmd.data self._mips_state_sub_map[state.key] = state self.log_debug(f'mips register mips state, {state.key}') elif mips_cmd.type_ == MipsCmdType.UNREG_MIPS_STATE: state: MipsState = mips_cmd.data del self._mips_state_sub_map[state.key] self.log_debug(f'mips unregister mips state, {state.key}') else: if self._on_mips_cmd: self._on_mips_cmd(mips_cmd=mips_cmd) def __mqtt_read_handler(self, ctx: any) -> None: self.__mqtt_loop_handler(ctx=ctx) def __mqtt_write_handler(self, ctx: any) -> None: self.mev_set_write_handler(self._mqtt_fd, None, None) self.__mqtt_loop_handler(ctx=ctx) def __mqtt_timer_handler(self, ctx: any) -> None: self.__mqtt_loop_handler(ctx=ctx) if self._mqtt: self._mqtt_timer = self.mev_set_timeout( self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) def __mqtt_loop_handler(self, ctx: any) -> None: try: if self._mqtt: self._mqtt.loop_read() if self._mqtt: self._mqtt.loop_write() if self._mqtt: self._mqtt.loop_misc() if self._mqtt and self._mqtt.want_write(): self.mev_set_write_handler( self._mqtt_fd, self.__mqtt_write_handler, None) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'__mqtt_loop_handler, {err}') raise err def __mips_loop_thread(self) -> None: self.log_info('mips_loop_thread start') # Set mqtt config if self._username: self._mqtt.username_pw_set( username=self._username, password=self._password) if ( self._ca_file and self._cert_file and self._key_file ): self._mqtt.tls_set( tls_version=ssl.PROTOCOL_TLS_CLIENT, ca_certs=self._ca_file, certfile=self._cert_file, keyfile=self._key_file) else: self._mqtt.tls_set(tls_version=ssl.PROTOCOL_TLS_CLIENT) self._mqtt.tls_insecure_set(True) self._mqtt.on_connect = self.__on_connect self._mqtt.on_connect_fail = self.__on_connect_failed self._mqtt.on_disconnect = self.__on_disconnect self._mqtt.on_message = self.__on_message # Run event loop self._mev.loop_forever() self.log_info('mips_loop_thread exit!') def __on_connect(self, client, user_data, flags, rc, props) -> None: if not self._mqtt.is_connected(): return self.log_info(f'mips connect, {flags}, {rc}, {props}') self._mqtt_state = True if self._on_mips_connect: self.mev_set_timeout( timeout_ms=0, handler=lambda ctx: self._on_mips_connect(rc, props)) for item in self._mips_state_sub_map.values(): if item.handler is None: continue self.main_loop.call_soon_threadsafe( self.main_loop.create_task, item.handler(item.key, True)) # Resolve future self._event_connect.set() self._event_disconnect.clear() def __on_connect_failed(self, client, user_data, flags, rc) -> None: self.log_error(f'mips connect failed, {flags}, {rc}') # Try to reconnect self.__mips_try_reconnect() def __on_disconnect(self, client, user_data, rc, props) -> None: if self._mqtt_state: self.log_error(f'mips disconnect, {rc}, {props}') self._mqtt_state = False if self._mqtt_timer: self.mev_clear_timeout(self._mqtt_timer) self._mqtt_timer = None if self._mqtt_fd != -1: self.mev_set_read_handler(self._mqtt_fd, None, None) self.mev_set_write_handler(self._mqtt_fd, None, None) self._mqtt_fd = -1 # Clear retry sub if self._mips_sub_pending_timer: self.mev_clear_timeout(self._mips_sub_pending_timer) self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} if self._on_mips_disconnect: self.mev_set_timeout( timeout_ms=0, handler=lambda ctx: self._on_mips_disconnect(rc, props)) # Call state sub handler for item in self._mips_state_sub_map.values(): if item.handler is None: continue self.main_loop.call_soon_threadsafe( self.main_loop.create_task, item.handler(item.key, False)) # Try to reconnect self.__mips_try_reconnect() # Set event self._event_disconnect.set() self._event_connect.clear() def __on_message(self, client, user_data, msg) -> None: self._on_mips_message(topic=msg.topic, payload=msg.payload) def __mips_try_reconnect(self, immediately: bool = False) -> None: if self._mips_reconnect_timer: self.mev_clear_timeout(self._mips_reconnect_timer) self._mips_reconnect_timer = None if not self._mips_reconnect_tag: return interval: int = 0 if not immediately: interval = self.__get_next_reconnect_time() self.log_error( 'mips try reconnect after %sms', interval) self._mips_reconnect_timer = self.mev_set_timeout( interval, self.__mips_connect, None) def __mips_sub_internal_pending_handler(self, ctx: any) -> None: subbed_count = 1 for topic in list(self._mips_sub_pending_map.keys()): if subbed_count > self.MIPS_SUB_PATCH: break count = self._mips_sub_pending_map[topic] if count > 3: self._mips_sub_pending_map.pop(topic) self.log_error(f'retry mips sub internal error, {topic}') continue subbed_count += 1 result, mid = self._mqtt.subscribe(topic, qos=self.MIPS_QOS) if result == MQTT_ERR_SUCCESS: self._mips_sub_pending_map.pop(topic) self.log_debug(f'mips sub internal success, {topic}') continue self._mips_sub_pending_map[topic] = count+1 self.log_error( f'retry mips sub internal, {count}, {topic}, {result}, {mid}') if len(self._mips_sub_pending_map): self._mips_sub_pending_timer = self.mev_set_timeout( self.MIPS_SUB_INTERVAL, self.__mips_sub_internal_pending_handler, None) else: self._mips_sub_pending_timer = None def __mips_connect(self, ctx: any = None) -> None: result = MQTT_ERR_UNKNOWN if self._mips_reconnect_timer: self.mev_clear_timeout(self._mips_reconnect_timer) self._mips_reconnect_timer = None try: # Try clean mqtt fd before mqtt connect if self._mqtt_timer: self.mev_clear_timeout(self._mqtt_timer) self._mqtt_timer = None if self._mqtt_fd != -1: self.mev_set_read_handler(self._mqtt_fd, None, None) self.mev_set_write_handler(self._mqtt_fd, None, None) self._mqtt_fd = -1 result = self._mqtt.connect( host=self._host, port=self._port, clean_start=True, keepalive=MIHOME_MQTT_KEEPALIVE) self.log_info(f'__mips_connect success, {result}') except (TimeoutError, OSError) as error: self.log_error('__mips_connect, connect error, %s', error) if result == MQTT_ERR_SUCCESS: self._mqtt_fd = self._mqtt.socket() self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}') self.mev_set_read_handler( self._mqtt_fd, self.__mqtt_read_handler, None) if self._mqtt.want_write(): self.mev_set_write_handler( self._mqtt_fd, self.__mqtt_write_handler, None) self._mqtt_timer = self.mev_set_timeout( self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) else: self.log_error(f'__mips_connect error result, {result}') self.__mips_try_reconnect() def __mips_disconnect(self) -> None: if self._mips_reconnect_timer: self.mev_clear_timeout(self._mips_reconnect_timer) self._mips_reconnect_timer = None if self._mqtt_timer: self.mev_clear_timeout(self._mqtt_timer) self._mqtt_timer = None if self._mqtt_fd != -1: self.mev_set_read_handler(self._mqtt_fd, None, None) self.mev_set_write_handler(self._mqtt_fd, None, None) self._mqtt_fd = -1 self._mqtt.disconnect() def __get_next_reconnect_time(self) -> int: if self._mips_reconnect_interval == 0: self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN else: self._mips_reconnect_interval = min( self._mips_reconnect_interval*2, self.MIPS_RECONNECT_INTERVAL_MAX) return self._mips_reconnect_interval class MipsCloudClient(MipsClient): """MIoT Pub/Sub Cloud Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes _msg_matcher: MIoTMatcher def __init__( self, uuid: str, cloud_server: str, app_id: str, token: str, port: int = 8883, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._msg_matcher = MIoTMatcher() super().__init__( client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com', port=port, username=app_id, password=token, loop=loop) self.on_mips_cmd = self.__on_mips_cmd_handler self.on_mips_message = self.__on_mips_message_handler self.on_mips_connect = self.__on_mips_connect_handler self.on_mips_disconnect = self.__on_mips_disconnect_handler def deinit(self) -> None: self.mips_deinit() self._msg_matcher = None self.on_mips_cmd = None self.on_mips_message = None self.on_mips_connect = None @final def connect(self) -> None: self.mips_connect() @final async def connect_async(self) -> None: await self.mips_connect_async() @final def disconnect(self) -> None: self.mips_disconnect() self._msg_matcher = MIoTMatcher() @final async def disconnect_async(self) -> None: await self.mips_disconnect_async() self._msg_matcher = MIoTMatcher() def update_access_token(self, access_token: str) -> bool: if not isinstance(access_token, str): raise MIoTMipsError('invalid token') return self.update_mqtt_password(password=access_token) @final def sub_prop( self, did: str, handler: Callable[[dict, any], None], siid: int = None, piid: int = None, handler_ctx: any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') def on_prop_msg(topic: str, payload: str, ctx: any) -> bool: try: msg: dict = json.loads(payload) except json.JSONDecodeError: self.log_error( f'on_prop_msg, invalid msg, {topic}, {payload}') return if ( not isinstance(msg.get('params', None), dict) or 'siid' not in msg['params'] or 'piid' not in msg['params'] or 'value' not in msg['params'] ): self.log_error( f'on_prop_msg, invalid msg, {topic}, {payload}') return if handler: self.log_debug('on properties_changed, %s', payload) handler(msg['params'], ctx) return self.__reg_broadcast( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') return self.__unreg_broadcast(topic=topic) @final def sub_event( self, did: str, handler: Callable[[dict, any], None], siid: int = None, eiid: int = None, handler_ctx: any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') def on_event_msg(topic: str, payload: str, ctx: any) -> bool: try: msg: dict = json.loads(payload) except json.JSONDecodeError: self.log_error( f'on_event_msg, invalid msg, {topic}, {payload}') return if ( not isinstance(msg.get('params', None), dict) or 'siid' not in msg['params'] or 'eiid' not in msg['params'] or 'arguments' not in msg['params'] ): self.log_error( f'on_event_msg, invalid msg, {topic}, {payload}') return if handler: self.log_debug('on on_event_msg, %s', payload) msg['params']['from'] = 'cloud' handler(msg['params'], ctx) return self.__reg_broadcast( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') return self.__unreg_broadcast(topic=topic) @final def sub_device_state( self, did: str, handler: Callable[[str, MIoTDeviceState, any], None], handler_ctx: any = None ) -> bool: """subscribe online state.""" if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' def on_state_msg(topic: str, payload: str, ctx: any) -> None: msg: dict = json.loads(payload) # {"device_id":"xxxx","device_name":"米家智能插座3 ","event":"online", # "model": "cuco.plug.v3","timestamp":1709001070828,"uid":xxxx} if msg is None or 'device_id' not in msg or 'event' not in msg: self.log_error(f'on_state_msg, recv unknown msg, {payload}') return if msg['device_id'] != did: self.log_error( f'on_state_msg, err msg, {did}!={msg["device_id"]}') return if handler: self.log_debug('cloud, device state changed, %s', payload) handler( did, MIoTDeviceState.ONLINE if msg['event'] == 'online' else MIoTDeviceState.OFFLINE, ctx) return self.__reg_broadcast( topic=topic, handler=on_state_msg, handler_ctx=handler_ctx) @final def unsub_device_state(self, did: str) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' return self.__unreg_broadcast(topic=topic) async def get_dev_list_async( self, payload: str = None, timeout_ms: int = 10000 ) -> dict[str, dict]: raise NotImplementedError('please call in http client') async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> any: raise NotImplementedError('please call in http client') async def set_prop_async( self, did: str, siid: int, piid: int, value: any, timeout_ms: int = 10000 ) -> bool: raise NotImplementedError('please call in http client') async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> tuple[bool, list]: raise NotImplementedError('please call in http client') def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: """ NOTICE thread safe, this function will be called at the **mips** thread """ if mips_cmd.type_ == MipsCmdType.REG_BROADCAST: reg_bc: MipsRegBroadcast = mips_cmd.data if not self._msg_matcher.get(topic=reg_bc.topic): sub_bc: MipsBroadcast = MipsBroadcast( topic=reg_bc.topic, handler=reg_bc.handler, handler_ctx=reg_bc.handler_ctx) self._msg_matcher[reg_bc.topic] = sub_bc self._mips_sub_internal(topic=reg_bc.topic) else: self.log_debug(f'mips cloud re-reg broadcast, {reg_bc.topic}') elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: unreg_bc: MipsRegBroadcast = mips_cmd.data if self._msg_matcher.get(topic=unreg_bc.topic): del self._msg_matcher[unreg_bc.topic] self._mips_unsub_internal(topic=unreg_bc.topic) def __reg_broadcast( self, topic: str, handler: Callable[[str, str, any], None], handler_ctx: any = None ) -> bool: return self._mips_send_cmd( type_=MipsCmdType.REG_BROADCAST, data=MipsRegBroadcast( topic=topic, handler=handler, handler_ctx=handler_ctx)) def __unreg_broadcast(self, topic: str) -> bool: return self._mips_send_cmd( type_=MipsCmdType.UNREG_BROADCAST, data=MipsRegBroadcast(topic=topic)) def __on_mips_connect_handler(self, rc, props) -> None: """sub topic.""" for topic, _ in list( self._msg_matcher.iter_all_nodes()): self._mips_sub_internal(topic=topic) def __on_mips_disconnect_handler(self, rc, props) -> None: """unsub topic.""" pass def __on_mips_message_handler(self, topic: str, payload) -> None: """ NOTICE thread safe, this function will be called at the **mips** thread """ # broadcast bc_list: list[MipsBroadcast] = list( self._msg_matcher.iter_match(topic)) if not bc_list: return # self.log_debug(f"on broadcast, {topic}, {payload}") for item in bc_list or []: if item.handler is None: continue # NOTICE: call threadsafe self.main_loop.call_soon_threadsafe( item.handler, topic, payload, item.handler_ctx) class MipsLocalClient(MipsClient): """MIoT Pub/Sub Local Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes MIPS_RECONNECT_INTERVAL_MIN: int = 6000 MIPS_RECONNECT_INTERVAL_MAX: int = 60000 MIPS_SUB_PATCH: int = 1000 MIPS_SUB_INTERVAL: int = 100 _did: str _group_id: str _home_name: str _mips_seed_id: int _reply_topic: str _dev_list_change_topic: str _request_map: dict[str, MipsRequest] _msg_matcher: MIoTMatcher _device_state_sub_map: dict[str, MipsDeviceState] _get_prop_queue: dict[str, list] _get_prop_timer: asyncio.TimerHandle _on_dev_list_changed: Callable[[any, list[str]], asyncio.Future] def __init__( self, did: str, host: str, group_id: str, ca_file: str, cert_file: str, key_file: str, port: int = 8883, home_name: str = '', loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._did = did self._group_id = group_id self._home_name = home_name self._mips_seed_id = random.randint(0, self.UINT32_MAX) self._reply_topic = f'{did}/reply' self._dev_list_change_topic = f'{did}/appMsg/devListChange' self._request_map = {} self._msg_matcher = MIoTMatcher() self._device_state_sub_map = {} self._get_prop_queue = {} self._get_prop_timer = None self._on_dev_list_changed = None super().__init__( client_id=did, host=host, port=port, ca_file=ca_file, cert_file=cert_file, key_file=key_file, loop=loop) # MIPS local thread name use group_id self._mips_thread.name = self._group_id self.on_mips_cmd = self.__on_mips_cmd_handler self.on_mips_message = self.__on_mips_message_handler self.on_mips_connect = self.__on_mips_connect_handler @property def group_id(self) -> str: return self._group_id def deinit(self) -> None: self.mips_deinit() self._did = None self._mips_seed_id = None self._reply_topic = None self._dev_list_change_topic = None self._request_map = None self._msg_matcher = None self._device_state_sub_map = None self._get_prop_queue = None self._get_prop_timer = None self._on_dev_list_changed = None self.on_mips_cmd = None self.on_mips_message = None self.on_mips_connect = None def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs) def log_info(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.info(f'{self._home_name}, '+msg, *args, **kwargs) def log_error(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) @final def connect(self) -> None: self.mips_connect() @final async def connect_async(self) -> None: await self.mips_connect_async() @final def disconnect(self) -> None: self.mips_disconnect() self._request_map = {} self._msg_matcher = MIoTMatcher() self._device_state_sub_map = {} @final async def disconnect_async(self) -> None: await self.mips_disconnect_async() self._request_map = {} self._msg_matcher = MIoTMatcher() self._device_state_sub_map = {} @final def sub_prop( self, did: str, handler: Callable[[dict, any], None], siid: int = None, piid: int = None, handler_ctx: any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') def on_prop_msg(topic: str, payload: str, ctx: any): msg: dict = json.loads(payload) if ( msg is None or 'did' not in msg or 'siid' not in msg or 'piid' not in msg or 'value' not in msg ): # self.log_error(f'on_prop_msg, recv unknown msg, {payload}') return if handler: self.log_debug('local, on properties_changed, %s', payload) handler(msg, ctx) return self.__reg_broadcast( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') return self.__unreg_broadcast(topic=topic) @final def sub_event( self, did: str, handler: Callable[[dict, any], None], siid: int = None, eiid: int = None, handler_ctx: any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') def on_event_msg(topic: str, payload: str, ctx: any): msg: dict = json.loads(payload) if ( msg is None or 'did' not in msg or 'siid' not in msg or 'eiid' not in msg or 'arguments' not in msg ): # self.log_error(f'on_event_msg, recv unknown msg, {payload}') return if handler: self.log_debug('local, on event_occurred, %s', payload) handler(msg, ctx) return self.__reg_broadcast( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') return self.__unreg_broadcast(topic=topic) @final async def get_prop_safe_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> any: self._get_prop_queue.setdefault(did, []) fut: asyncio.Future = self.main_loop.create_future() self._get_prop_queue[did].append({ 'param': json.dumps({ 'did': did, 'siid': siid, 'piid': piid }), 'fut': fut, 'timeout_ms': timeout_ms }) if self._get_prop_timer is None: self._get_prop_timer = self.main_loop.create_task( self.__get_prop_timer_handle()) return await fut @final async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> any: result_obj = await self.__request_async( topic='proxy/get', payload=json.dumps({ 'did': did, 'siid': siid, 'piid': piid }), timeout_ms=timeout_ms) if not isinstance(result_obj, dict) or 'value' not in result_obj: return None return result_obj['value'] @final async def set_prop_async( self, did: str, siid: int, piid: int, value: any, timeout_ms: int = 10000 ) -> dict: payload_obj: dict = { 'did': did, 'rpc': { 'id': self.__gen_mips_id, 'method': 'set_properties', 'params': [{ 'did': did, 'siid': siid, 'piid': piid, 'value': value }] } } result_obj = await self.__request_async( topic='proxy/rpcReq', payload=json.dumps(payload_obj), timeout_ms=timeout_ms) if result_obj: if ( 'result' in result_obj and len(result_obj['result']) == 1 and 'did' in result_obj['result'][0] and result_obj['result'][0]['did'] == did and 'code' in result_obj['result'][0] ): return result_obj['result'][0] if 'error' in result_obj: return result_obj['error'] return { 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'message': 'Invalid result'} @final async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> dict: payload_obj: dict = { 'did': did, 'rpc': { 'id': self.__gen_mips_id, 'method': 'action', 'params': { 'did': did, 'siid': siid, 'aiid': aiid, 'in': in_list } } } result_obj = await self.__request_async( topic='proxy/rpcReq', payload=json.dumps(payload_obj), timeout_ms=timeout_ms) if result_obj: if 'result' in result_obj and 'code' in result_obj['result']: return result_obj['result'] if 'error' in result_obj: return result_obj['error'] return { 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'message': 'Invalid result'} @final async def get_dev_list_async( self, payload: str = None, timeout_ms: int = 10000 ) -> dict[str, dict]: result_obj = await self.__request_async( topic='proxy/getDevList', payload=payload or '{}', timeout_ms=timeout_ms) if not result_obj or 'devList' not in result_obj: return None device_list = {} for did, info in result_obj['devList'].items(): name: str = info.get('name', None) urn: str = info.get('urn', None) model: str = info.get('model', None) if name is None or urn is None or model is None: self.log_error(f'invalid device info, {did}, {info}') continue device_list[did] = { 'did': did, 'name': name, 'urn': urn, 'model': model, 'online': info.get('online', False), 'icon': info.get('icon', None), 'fw_version': None, 'home_id': '', 'home_name': '', 'room_id': info.get('roomId', ''), 'room_name': info.get('roomName', ''), 'specv2_access': info.get('specV2Access', False), 'push_available': info.get('pushAvailable', False), 'manufacturer': model.split('.')[0], } return device_list @final async def get_action_group_list_async( self, timeout_ms: int = 10000 ) -> list[str]: result_obj = await self.__request_async( topic='proxy/getMijiaActionGroupList', payload='{}', timeout_ms=timeout_ms) if not result_obj or 'result' not in result_obj: return None return result_obj['result'] @final async def exec_action_group_list_async( self, ag_id: str, timeout_ms: int = 10000 ) -> dict: result_obj = await self.__request_async( topic='proxy/execMijiaActionGroup', payload=f'{{"id":"{ag_id}"}}', timeout_ms=timeout_ms) if result_obj: if 'result' in result_obj: return result_obj['result'] if 'error' in result_obj: return result_obj['error'] return { 'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value, 'message': 'invalid result'} @final @property def on_dev_list_changed(self) -> Callable[[any, list[str]], asyncio.Future]: return self._on_dev_list_changed @final @on_dev_list_changed.setter def on_dev_list_changed( self, func: Callable[[any, list[str]], asyncio.Future] ) -> None: """run in main loop.""" self._on_dev_list_changed = func @final def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: if mips_cmd.type_ == MipsCmdType.CALL_API: req_data: MipsRequestData = mips_cmd.data req = MipsRequest() req.mid = self.__gen_mips_id req.on_reply = req_data.on_reply req.on_reply_ctx = req_data.on_reply_ctx pub_topic: str = f'master/{req_data.topic}' result = self.__mips_publish( topic=pub_topic, payload=req_data.payload, mid=req.mid, ret_topic=self._reply_topic) self.log_debug( f'mips local call api, {result}, {req.mid}, {pub_topic}, ' f'{req_data.payload}') def on_request_timeout(req: MipsRequest): self.log_error( f'on mips request timeout, {req.mid}, {pub_topic}' f', {req_data.payload}') self._request_map.pop(str(req.mid), None) req.on_reply( '{"error":{"code":-10006, "message":"timeout"}}', req.on_reply_ctx) req.timer = self.mev_set_timeout( req_data.timeout_ms, on_request_timeout, req) self._request_map[str(req.mid)] = req elif mips_cmd.type_ == MipsCmdType.REG_BROADCAST: reg_bc: MipsRegBroadcast = mips_cmd.data sub_topic: str = f'{self._did}/{reg_bc.topic}' if not self._msg_matcher.get(sub_topic): sub_bc: MipsBroadcast = MipsBroadcast( topic=sub_topic, handler=reg_bc.handler, handler_ctx=reg_bc.handler_ctx) self._msg_matcher[sub_topic] = sub_bc self._mips_sub_internal(topic=f'master/{reg_bc.topic}') else: self.log_debug(f'mips re-reg broadcast, {sub_topic}') elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: unreg_bc: MipsRegBroadcast = mips_cmd.data # Central hub gateway needs to add prefix unsub_topic: str = f'{self._did}/{unreg_bc.topic}' if self._msg_matcher.get(unsub_topic): del self._msg_matcher[unsub_topic] self._mips_unsub_internal( topic=re.sub(f'^{self._did}', 'master', unsub_topic)) elif mips_cmd.type_ == MipsCmdType.REG_DEVICE_STATE: reg_dev_state: MipsRegDeviceState = mips_cmd.data self._device_state_sub_map[reg_dev_state.did] = reg_dev_state self.log_debug( f'mips local reg device state, {reg_dev_state.did}') elif mips_cmd.type_ == MipsCmdType.UNREG_DEVICE_STATE: unreg_dev_state: MipsRegDeviceState = mips_cmd.data del self._device_state_sub_map[unreg_dev_state.did] self.log_debug( f'mips local unreg device state, {unreg_dev_state.did}') else: self.log_error( f'mips local recv unknown cmd, {mips_cmd.type_}, ' f'{mips_cmd.data}') def __on_mips_connect_handler(self, rc, props) -> None: self.log_debug('__on_mips_connect_handler') # Sub did/#, include reply topic self._mips_sub_internal(f'{self._did}/#') # Sub device list change self._mips_sub_internal('master/appMsg/devListChange') # Do not need to subscribe api topics, for they are covered by did/# # Sub api topic. # Sub broadcast topic for topic, _ in list(self._msg_matcher.iter_all_nodes()): self._mips_sub_internal( topic=re.sub(f'^{self._did}', 'master', topic)) @final def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: mips_msg: MipsMessage = MipsMessage.unpack(payload) # self.log_debug( # f"mips local client, on_message, {topic} -> {mips_msg}") # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') req: MipsRequest = self._request_map.pop(str(mips_msg.mid), None) if req: # Cancel timer self.mev_clear_timeout(req.timer) if req.on_reply: self.main_loop.call_soon_threadsafe( req.on_reply, mips_msg.payload or '{}', req.on_reply_ctx) return # Broadcast bc_list: list[MipsBroadcast] = list(self._msg_matcher.iter_match( topic=topic)) if bc_list: self.log_debug(f'on broadcast, {topic}, {mips_msg}') for item in bc_list or []: if item.handler is None: continue self.main_loop.call_soon_threadsafe( item.handler, topic[topic.find('/')+1:], mips_msg.payload or '{}', item.handler_ctx) return # Device list change if topic == self._dev_list_change_topic: payload_obj: dict = json.loads(mips_msg.payload) dev_list = payload_obj.get('devList', None) if not isinstance(dev_list, list) or not dev_list: _LOGGER.error( 'unknown devListChange msg, %s', mips_msg.payload) return if self._on_dev_list_changed: self.main_loop.call_soon_threadsafe( self.main_loop.create_task, self._on_dev_list_changed(self, payload_obj['devList'])) return self.log_debug( f'mips local client, recv unknown msg, {topic} -> {mips_msg}') @property def __gen_mips_id(self) -> int: mips_id: int = self._mips_seed_id self._mips_seed_id = int((self._mips_seed_id+1) % self.UINT32_MAX) return mips_id def __mips_publish( self, topic: str, payload: str | bytes, mid: int = None, ret_topic: str = None, wait_for_publish: bool = False, timeout_ms: int = 10000 ) -> bool: mips_msg: bytes = MipsMessage.pack( mid=mid or self.__gen_mips_id, payload=payload, msg_from='local', ret_topic=ret_topic) return self._mips_publish_internal( topic=topic.strip(), payload=mips_msg, wait_for_publish=wait_for_publish, timeout_ms=timeout_ms) def __request( self, topic: str, payload: str, on_reply: Callable[[str, any], None], on_reply_ctx: any = None, timeout_ms: int = 10000 ) -> bool: if topic is None or payload is None or on_reply is None: raise MIoTMipsError('invalid params') req_data: MipsRequestData = MipsRequestData() req_data.topic = topic req_data.payload = payload req_data.on_reply = on_reply req_data.on_reply_ctx = on_reply_ctx req_data.timeout_ms = timeout_ms return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data) def __reg_broadcast( self, topic: str, handler: Callable[[str, str, any], None], handler_ctx: any ) -> bool: return self._mips_send_cmd( type_=MipsCmdType.REG_BROADCAST, data=MipsRegBroadcast( topic=topic, handler=handler, handler_ctx=handler_ctx)) def __unreg_broadcast(self, topic) -> bool: return self._mips_send_cmd( type_=MipsCmdType.UNREG_BROADCAST, data=MipsRegBroadcast(topic=topic)) @final async def __request_async( self, topic: str, payload: str, timeout_ms: int = 10000 ) -> dict: fut_handler: asyncio.Future = self.main_loop.create_future() def on_msg_reply(payload: str, ctx: any): fut: asyncio.Future = ctx if fut: self.main_loop.call_soon_threadsafe(fut.set_result, payload) if not self.__request( topic=topic, payload=payload, on_reply=on_msg_reply, on_reply_ctx=fut_handler, timeout_ms=timeout_ms): # Request error fut_handler.set_result('internal request error') result = await fut_handler try: return json.loads(result) except json.JSONDecodeError: return { 'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value, 'message': f'Error: {result}'} async def __get_prop_timer_handle(self) -> None: for did in list(self._get_prop_queue.keys()): item = self._get_prop_queue[did].pop() _LOGGER.debug('get prop, %s, %s', did, item) result_obj = await self.__request_async( topic='proxy/get', payload=item['param'], timeout_ms=item['timeout_ms']) if result_obj is None or 'value' not in result_obj: item['fut'].set_result(None) else: item['fut'].set_result(result_obj['value']) if not self._get_prop_queue[did]: self._get_prop_queue.pop(did, None) if self._get_prop_queue: self._get_prop_timer = self.main_loop.call_later( 0.1, lambda: self.main_loop.create_task( self.__get_prop_timer_handle())) else: self._get_prop_timer = None