diff --git a/custom_components/xiaomi_home/miot/miot_client.py b/custom_components/xiaomi_home/miot/miot_client.py index b618ea59..58fb504a 100644 --- a/custom_components/xiaomi_home/miot/miot_client.py +++ b/custom_components/xiaomi_home/miot/miot_client.py @@ -357,7 +357,7 @@ async def deinit_async(self) -> None: # Cloud mips self._mips_cloud.unsub_mips_state( key=f'{self._uid}-{self._cloud_server}') - self._mips_cloud.disconnect() + self._mips_cloud.deinit() # Cancel refresh cloud devices if self._refresh_cloud_devices_timer: self._refresh_cloud_devices_timer.cancel() @@ -370,7 +370,7 @@ async def deinit_async(self) -> None: for mips in self._mips_local.values(): mips.on_dev_list_changed = None mips.unsub_mips_state(key=mips.group_id) - mips.disconnect() + mips.deinit() if self._mips_local_state_changed_timers: for timer_item in ( self._mips_local_state_changed_timers.values()): diff --git a/custom_components/xiaomi_home/miot/miot_ev.py b/custom_components/xiaomi_home/miot/miot_ev.py deleted file mode 100644 index c0cc97f5..00000000 --- a/custom_components/xiaomi_home/miot/miot_ev.py +++ /dev/null @@ -1,324 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Copyright (C) 2024 Xiaomi Corporation. - -The ownership and intellectual property rights of Xiaomi Home Assistant -Integration and related Xiaomi cloud service API interface provided under this -license, including source code and object code (collectively, "Licensed Work"), -are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi -hereby grants you a personal, limited, non-exclusive, non-transferable, -non-sublicensable, and royalty-free license to reproduce, use, modify, and -distribute the Licensed Work only for your use of Home Assistant for -non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize -you to use the Licensed Work for any other purpose, including but not limited -to use Licensed Work to develop applications (APP), Web services, and other -forms of software. - -You may reproduce and distribute copies of the Licensed Work, with or without -modifications, whether in source or object form, provided that you must give -any other recipients of the Licensed Work a copy of this License and retain all -copyright and disclaimers. - -Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR -CONDITIONS OF ANY KIND, either express or implied, including, without -limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR -OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or -FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible -for any direct, indirect, special, incidental, or consequential damages or -losses arising from the use or inability to use the Licensed Work. - -Xiaomi reserves all rights not expressly granted to you in this License. -Except for the rights expressly granted by Xiaomi under this License, Xiaomi -does not authorize you in any form to use the trademarks, copyrights, or other -forms of intellectual property rights of Xiaomi and its affiliates, including, -without limitation, without obtaining other written permission from Xiaomi, you -shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that -may make the public associate with Xiaomi in any form to publicize or promote -the software or hardware devices that use the Licensed Work. - -Xiaomi has the right to immediately terminate all your authorization under this -License in the event: -1. You assert patent invalidation, litigation, or other claims against patents -or other intellectual property rights of Xiaomi or its affiliates; or, -2. You make, have made, manufacture, sell, or offer to sell products that knock -off Xiaomi or its affiliates' products. - -MIoT event loop. -""" -import selectors -import heapq -import time -import traceback -from typing import Any, Callable, TypeVar -import logging -import threading - -# pylint: disable=relative-beyond-top-level -from .miot_error import MIoTEvError - -_LOGGER = logging.getLogger(__name__) - -TimeoutHandle = TypeVar('TimeoutHandle') - - -class MIoTFdHandler: - """File descriptor handler.""" - fd: int - read_handler: Callable[[Any], None] - read_handler_ctx: Any - write_handler: Callable[[Any], None] - write_handler_ctx: Any - - def __init__( - self, fd: int, - read_handler: Callable[[Any], None] = None, - read_handler_ctx: Any = None, - write_handler: Callable[[Any], None] = None, - write_handler_ctx: Any = None - ) -> None: - self.fd = fd - self.read_handler = read_handler - self.read_handler_ctx = read_handler_ctx - self.write_handler = write_handler - self.write_handler_ctx = write_handler_ctx - - -class MIoTTimeout: - """Timeout handler.""" - key: TimeoutHandle - target: int - handler: Callable[[Any], None] - handler_ctx: Any - - def __init__( - self, key: str = None, target: int = None, - handler: Callable[[Any], None] = None, - handler_ctx: Any = None - ) -> None: - self.key = key - self.target = target - self.handler = handler - self.handler_ctx = handler_ctx - - def __lt__(self, other): - return self.target < other.target - - -class MIoTEventLoop: - """MIoT event loop.""" - _poll_fd: selectors.DefaultSelector - - _fd_handlers: dict[str, MIoTFdHandler] - - _timer_heap: list[MIoTTimeout] - _timer_handlers: dict[str, MIoTTimeout] - _timer_handle_seed: int - - # Label if the current fd handler is freed inside a read handler to - # avoid invalid reading. - _fd_handler_freed_in_read_handler: bool - - def __init__(self) -> None: - self._poll_fd = selectors.DefaultSelector() - self._timer_heap = [] - self._timer_handlers = {} - self._timer_handle_seed = 1 - self._fd_handlers = {} - self._fd_handler_freed_in_read_handler = False - - def loop_forever(self) -> None: - """Run an event loop in current thread.""" - next_timeout: int - while True: - next_timeout = 0 - # Handle timer - now_ms: int = self.__get_monotonic_ms - while len(self._timer_heap) > 0: - timer: MIoTTimeout = self._timer_heap[0] - if timer is None: - break - if timer.target <= now_ms: - heapq.heappop(self._timer_heap) - del self._timer_handlers[timer.key] - if timer.handler: - timer.handler(timer.handler_ctx) - else: - next_timeout = timer.target-now_ms - break - # Are there any files to listen to - if next_timeout == 0 and self._fd_handlers: - next_timeout = None # None == infinite - # Wait for timers & fds - if next_timeout == 0: - # Neither timer nor fds exist, exit loop - break - # Handle fd event - events = self._poll_fd.select( - timeout=next_timeout/1000.0 if next_timeout else next_timeout) - for key, mask in events: - fd_handler: MIoTFdHandler = key.data - if fd_handler is None: - continue - self._fd_handler_freed_in_read_handler = False - fd_key = str(id(fd_handler.fd)) - if fd_key not in self._fd_handlers: - continue - if ( - mask & selectors.EVENT_READ > 0 - and fd_handler.read_handler - ): - fd_handler.read_handler(fd_handler.read_handler_ctx) - if ( - mask & selectors.EVENT_WRITE > 0 - and self._fd_handler_freed_in_read_handler is False - and fd_handler.write_handler - ): - fd_handler.write_handler(fd_handler.write_handler_ctx) - - def loop_stop(self) -> None: - """Stop the event loop.""" - if self._poll_fd: - self._poll_fd.close() - self._poll_fd = None - self._fd_handlers = {} - self._timer_heap = [] - self._timer_handlers = {} - - def set_timeout( - self, timeout_ms: int, handler: Callable[[Any], None], - handler_ctx: Any = None - ) -> TimeoutHandle: - """Set a timer.""" - if timeout_ms is None or handler is None: - raise MIoTEvError('invalid params') - new_timeout: MIoTTimeout = MIoTTimeout() - new_timeout.key = self.__get_next_timeout_handle - new_timeout.target = self.__get_monotonic_ms + timeout_ms - new_timeout.handler = handler - new_timeout.handler_ctx = handler_ctx - heapq.heappush(self._timer_heap, new_timeout) - self._timer_handlers[new_timeout.key] = new_timeout - return new_timeout.key - - def clear_timeout(self, timer_key: TimeoutHandle) -> None: - """Stop and remove the timer.""" - if timer_key is None: - return - timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None) - if timer: - self._timer_heap = list(self._timer_heap) - self._timer_heap.remove(timer) - heapq.heapify(self._timer_heap) - - def set_read_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any = None - ) -> bool: - """Set a read handler for a file descriptor. - - Returns: - bool: True, success. False, failed. - """ - self.__set_handler( - fd, is_read=True, handler=handler, handler_ctx=handler_ctx) - - def set_write_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any = None - ) -> bool: - """Set a write handler for a file descriptor. - - Returns: - bool: True, success. False, failed. - """ - self.__set_handler( - fd, is_read=False, handler=handler, handler_ctx=handler_ctx) - - def __set_handler( - self, fd, is_read: bool, handler: Callable[[Any], None], - handler_ctx: Any = None - ) -> bool: - """Set a handler.""" - if fd is None: - raise MIoTEvError('invalid params') - - if not self._poll_fd: - raise MIoTEvError('event loop not started') - - fd_key: str = str(id(fd)) - fd_handler = self._fd_handlers.get(fd_key, None) - - if fd_handler is None: - fd_handler = MIoTFdHandler(fd=fd) - fd_handler.fd = fd - self._fd_handlers[fd_key] = fd_handler - - read_handler_existed = fd_handler.read_handler is not None - write_handler_existed = fd_handler.write_handler is not None - if is_read is True: - fd_handler.read_handler = handler - fd_handler.read_handler_ctx = handler_ctx - else: - fd_handler.write_handler = handler - fd_handler.write_handler_ctx = handler_ctx - - if fd_handler.read_handler is None and fd_handler.write_handler is None: - # Remove from epoll and map - try: - self._poll_fd.unregister(fd) - except (KeyError, ValueError, OSError) as e: - del e - self._fd_handlers.pop(fd_key, None) - # May be inside a read handler, if not, this has no effect - self._fd_handler_freed_in_read_handler = True - elif read_handler_existed is False and write_handler_existed is False: - # Add to epoll - events = 0x0 - if fd_handler.read_handler: - events |= selectors.EVENT_READ - if fd_handler.write_handler: - events |= selectors.EVENT_WRITE - try: - self._poll_fd.register(fd, events=events, data=fd_handler) - except (KeyError, ValueError, OSError) as e: - _LOGGER.error( - '%s, register fd, error, %s, %s, %s, %s, %s', - threading.current_thread().name, - 'read' if is_read else 'write', - fd_key, handler, e, traceback.format_exc()) - self._fd_handlers.pop(fd_key, None) - return False - elif ( - read_handler_existed != (fd_handler.read_handler is not None) - or write_handler_existed != (fd_handler.write_handler is not None) - ): - # Modify epoll - events = 0x0 - if fd_handler.read_handler: - events |= selectors.EVENT_READ - if fd_handler.write_handler: - events |= selectors.EVENT_WRITE - try: - self._poll_fd.modify(fd, events=events, data=fd_handler) - except (KeyError, ValueError, OSError) as e: - _LOGGER.error( - '%s, modify fd, error, %s, %s, %s, %s, %s', - threading.current_thread().name, - 'read' if is_read else 'write', - fd_key, handler, e, traceback.format_exc()) - self._fd_handlers.pop(fd_key, None) - return False - - return True - - @property - def __get_next_timeout_handle(self) -> str: - # Get next timeout handle, that is not larger than the maximum - # value of UINT64 type. - self._timer_handle_seed += 1 - # uint64 max - self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF - return str(self._timer_handle_seed) - - @property - def __get_monotonic_ms(self) -> int: - """Get monotonic ms timestamp.""" - return int(time.monotonic()*1000) diff --git a/custom_components/xiaomi_home/miot/miot_i18n.py b/custom_components/xiaomi_home/miot/miot_i18n.py index 152bc08d..b6e96f42 100644 --- a/custom_components/xiaomi_home/miot/miot_i18n.py +++ b/custom_components/xiaomi_home/miot/miot_i18n.py @@ -48,7 +48,7 @@ import asyncio import logging import os -from typing import Optional +from typing import Optional, Union # pylint: disable=relative-beyond-top-level from .common import load_json_file @@ -98,7 +98,7 @@ async def deinit_async(self) -> None: def translate( self, key: str, replace: Optional[dict[str, str]] = None - ) -> str | dict | None: + ) -> Union[str, dict, None]: result = self._data for item in key.split('.'): if item not in result: diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 31911669..fd9ff47f 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -381,7 +381,8 @@ def __update_keep_alive(self, state: _MIoTLanDeviceState) -> None: _MIoTLanDeviceState(state.value+1)) # Fast ping if self._if_name is None: - _LOGGER.error('if_name is Not set for device, %s', self.did) + _LOGGER.error( + 'if_name is Not set for device, %s', self.did) return if self.ip is None: _LOGGER.error('ip is Not set for device, %s', self.did) @@ -419,10 +420,10 @@ def __change_online(self, online: bool) -> None: self.online = True else: _LOGGER.info('unstable device detected, %s', self.did) - self._online_offline_timer = \ + self._online_offline_timer = ( self._manager.internal_loop.call_later( self.NETWORK_UNSTABLE_RESUME_TH, - self.__online_resume_handler) + self.__online_resume_handler)) def __online_resume_handler(self) -> None: _LOGGER.info('unstable resume threshold past, %s', self.did) @@ -508,9 +509,9 @@ def __init__( key='miot_lan', group_id='*', handler=self.__on_mips_service_change) self._enable_subscribe = enable_subscribe - self._virtual_did = str(virtual_did) \ - if (virtual_did is not None) \ - else str(secrets.randbits(64)) + self._virtual_did = ( + str(virtual_did) if (virtual_did is not None) + else str(secrets.randbits(64))) # Init socket probe message probe_bytes = bytearray(self.OT_PROBE_LEN) probe_bytes[:20] = ( @@ -948,7 +949,7 @@ async def __on_mips_service_change( # The following methods SHOULD ONLY be called in the internal loop - def ping(self, if_name: str | None, target_ip: str) -> None: + def ping(self, if_name: Optional[str], target_ip: str) -> None: if not target_ip: return self.__sendto( @@ -964,7 +965,7 @@ def send2device( ) -> None: if timeout_ms and not handler: raise ValueError('handler is required when timeout_ms is set') - device: _MIoTLanDevice | None = self._lan_devices.get(did) + device: Optional[_MIoTLanDevice] = self._lan_devices.get(did) if not device: raise ValueError('invalid device') if not device.cipher: @@ -1232,7 +1233,7 @@ def __raw_message_handler( return # Keep alive message did: str = str(struct.unpack('>Q', data[4:12])[0]) - device: _MIoTLanDevice | None = self._lan_devices.get(did) + device: Optional[_MIoTLanDevice] = self._lan_devices.get(did) if not device: return timestamp: int = struct.unpack('>I', data[12:16])[0] @@ -1272,8 +1273,8 @@ def __message_handler(self, did: str, msg: dict) -> None: _LOGGER.warning('invalid message, no id, %s, %s', did, msg) return # Reply - req: _MIoTLanRequestData | None = \ - self._pending_requests.pop(msg['id'], None) + req: Optional[_MIoTLanRequestData] = ( + self._pending_requests.pop(msg['id'], None)) if req: if req.timeout: req.timeout.cancel() @@ -1334,7 +1335,7 @@ def __filter_dup_message(self, did: str, msg_id: int) -> bool: return False def __sendto( - self, if_name: str | None, data: bytes, address: str, port: int + self, if_name: Optional[str], data: bytes, address: str, port: int ) -> None: if if_name is None: # Broadcast @@ -1356,7 +1357,7 @@ def __scan_devices(self) -> None: try: # Scan devices self.ping(if_name=None, target_ip='255.255.255.255') - except Exception as err: # pylint: disable=broad-exception-caught + except Exception as err: # pylint: disable=broad-exception-caught # Ignore any exceptions to avoid blocking the loop _LOGGER.error('ping device error, %s', err) pass diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index 6c6b3580..1cade876 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -48,8 +48,6 @@ import asyncio import json import logging -import os -import queue import random import re import ssl @@ -58,24 +56,24 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, auto -from typing import Any, Callable, Optional, final +from typing import Any, Callable, Optional, final, Coroutine from paho.mqtt.client import ( MQTT_ERR_SUCCESS, MQTT_ERR_UNKNOWN, Client, - MQTTv5) + MQTTv5, + MQTTMessage) # pylint: disable=relative-beyond-top-level from .common import MIoTMatcher from .const import MIHOME_MQTT_KEEPALIVE from .miot_error import MIoTErrorCode, MIoTMipsError -from .miot_ev import MIoTEventLoop, TimeoutHandle _LOGGER = logging.getLogger(__name__) -class MipsMsgTypeOptions(Enum): +class _MipsMsgTypeOptions(Enum): """MIoT Pub/Sub message type.""" ID = 0 RET_TOPIC = auto() @@ -84,16 +82,16 @@ class MipsMsgTypeOptions(Enum): MAX = auto() -class MipsMessage: +class _MipsMessage: """MIoT Pub/Sub message.""" mid: int = 0 - msg_from: str = None - ret_topic: str = None - payload: str = None + msg_from: Optional[str] = None + ret_topic: Optional[str] = None + payload: Optional[str] = None @staticmethod - def unpack(data: bytes): - mips_msg = MipsMessage() + def unpack(data: bytes) -> '_MipsMessage': + mips_msg = _MipsMessage() data_len = len(data) data_start = 0 data_end = 0 @@ -104,15 +102,15 @@ def unpack(data: bytes): unpack_data = data[data_end:data_end+unpack_len] # string end with \x00 match unpack_type: - case MipsMsgTypeOptions.ID.value: + case _MipsMsgTypeOptions.ID.value: mips_msg.mid = int.from_bytes( unpack_data, byteorder='little') - case MipsMsgTypeOptions.RET_TOPIC.value: + case _MipsMsgTypeOptions.RET_TOPIC.value: mips_msg.ret_topic = str( unpack_data.strip(b'\x00'), 'utf-8') - case MipsMsgTypeOptions.PAYLOAD.value: + case _MipsMsgTypeOptions.PAYLOAD.value: mips_msg.payload = str(unpack_data.strip(b'\x00'), 'utf-8') - case MipsMsgTypeOptions.FROM.value: + case _MipsMsgTypeOptions.FROM.value: mips_msg.msg_from = str( unpack_data.strip(b'\x00'), 'utf-8') case _: @@ -122,155 +120,73 @@ def unpack(data: bytes): @staticmethod def pack( - mid: int, payload: str, msg_from: str = None, ret_topic: str = None + mid: int, + payload: str, + msg_from: Optional[str] = None, + ret_topic: Optional[str] = None ) -> bytes: if mid is None or payload is None: raise MIoTMipsError('invalid mid or payload') pack_msg: bytes = b'' # mid - pack_msg += struct.pack(' str: return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}' -class MipsCmdType(Enum): - """MIoT Pub/Sub command type.""" - CONNECT = 0 - DISCONNECT = auto() - DEINIT = auto() - SUB = auto() - UNSUB = auto() - CALL_API = auto() - REG_BROADCAST = auto() - UNREG_BROADCAST = auto() - - REG_MIPS_STATE = auto() - UNREG_MIPS_STATE = auto() - REG_DEVICE_STATE = auto() - UNREG_DEVICE_STATE = auto() - - @dataclass -class MipsCmd: - """MIoT Pub/Sub command.""" - type_: MipsCmdType - data: Any - - def __init__(self, type_: MipsCmdType, data: Any) -> None: - self.type_ = type_ - self.data = data - - -@dataclass -class MipsRequest: +class _MipsRequest: """MIoT Pub/Sub request.""" - mid: int = None - on_reply: Callable[[str, Any], None] = None - on_reply_ctx: Any = None - timer: TimeoutHandle = None - - -@dataclass -class MipsRequestData: - """MIoT Pub/Sub request data.""" - topic: str = None - payload: str = None - on_reply: Callable[[str, Any], None] = None - on_reply_ctx: Any = None - timeout_ms: int = None - - -@dataclass -class MipsSendBroadcastData: - """MIoT Pub/Sub send broadcast data.""" - topic: str = None - payload: str = None - - -@dataclass -class MipsIncomingApiCall: - """MIoT Pub/Sub incoming API call.""" - mid: int = None - ret_topic: str = None - timer: TimeoutHandle = None - - -@dataclass -class MipsApi: - """MIoT Pub/Sub API.""" - topic: str = None - """ - param1: session - param2: payload - param3: handler_ctx - """ - handler: Callable[[MipsIncomingApiCall, str, Any], None] = None - handler_ctx: Any = None - - -class MipsRegApi(MipsApi): - """.MIoT Pub/Sub register API.""" + mid: int + on_reply: Callable[[str, Any], None] + on_reply_ctx: Any + timer: Optional[asyncio.TimerHandle] @dataclass -class MipsReplyData: - """MIoT Pub/Sub reply data.""" - session: MipsIncomingApiCall = None - payload: str = None - - -@dataclass -class MipsBroadcast: +class _MipsBroadcast: """MIoT Pub/Sub broadcast.""" - topic: str = None + topic: str """ param 1: msg topic param 2: msg payload param 3: handle_ctx """ - handler: Callable[[str, str, Any], None] = None - handler_ctx: Any = None + handler: Callable[[str, str, Any], None] + handler_ctx: Any def __str__(self) -> str: return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}' -class MipsRegBroadcast(MipsBroadcast): - """MIoT Pub/Sub register broadcast.""" - - @dataclass -class MipsState: +class _MipsState: """MIoT Pub/Sub state.""" - key: str = None + key: str """ str: key bool: mips connect state """ - handler: Callable[[str, bool], asyncio.Future] = None - - -class MipsRegState(MipsState): - """MIoT Pub/Sub register state.""" + handler: Callable[[str, bool], Coroutine] class MIoTDeviceState(Enum): @@ -283,69 +199,66 @@ class MIoTDeviceState(Enum): @dataclass class MipsDeviceState: """MIoT Pub/Sub device state.""" - did: str = None + did: Optional[str] = None """handler str: did MIoTDeviceState: online/offline/disable Any: ctx """ - handler: Callable[[str, MIoTDeviceState, Any], None] = None + handler: Optional[Callable[[str, MIoTDeviceState, Any], None]] = None handler_ctx: Any = None -class MipsRegDeviceState(MipsDeviceState): - """MIoT Pub/Sub register device state.""" - - -class MipsClient(ABC): +class _MipsClient(ABC): """MIoT Pub/Sub client.""" # pylint: disable=unused-argument - MQTT_INTERVAL_MS = 1000 + MQTT_INTERVAL_S = 1 MIPS_QOS: int = 2 UINT32_MAX: int = 0xFFFFFFFF - MIPS_RECONNECT_INTERVAL_MIN: int = 30000 - MIPS_RECONNECT_INTERVAL_MAX: int = 600000 + MIPS_RECONNECT_INTERVAL_MIN: float = 30 + MIPS_RECONNECT_INTERVAL_MAX: float = 600 MIPS_SUB_PATCH: int = 300 - MIPS_SUB_INTERVAL: int = 1000 + MIPS_SUB_INTERVAL: float = 1 main_loop: asyncio.AbstractEventLoop - _logger: logging.Logger + _logger: Optional[logging.Logger] _client_id: str _host: str _port: int - _username: str - _password: str - _ca_file: str - _cert_file: str - _key_file: str - - _mqtt_logger: logging.Logger + _username: Optional[str] + _password: Optional[str] + _ca_file: Optional[str] + _cert_file: Optional[str] + _key_file: Optional[str] + _tls_done: bool + + _mqtt_logger: Optional[logging.Logger] _mqtt: Client _mqtt_fd: int - _mqtt_timer: TimeoutHandle + _mqtt_timer: Optional[asyncio.TimerHandle] _mqtt_state: bool _event_connect: asyncio.Event _event_disconnect: asyncio.Event - _mev: MIoTEventLoop - _mips_thread: threading.Thread - _mips_queue: queue.Queue - _cmd_event_fd: os.eventfd + _internal_loop: asyncio.AbstractEventLoop + _mips_thread: Optional[threading.Thread] _mips_reconnect_tag: bool - _mips_reconnect_interval: int - _mips_reconnect_timer: Optional[TimeoutHandle] - _mips_state_sub_map: dict[str, MipsState] + _mips_reconnect_interval: float + _mips_reconnect_timer: Optional[asyncio.TimerHandle] + _mips_state_sub_map: dict[str, _MipsState] + _mips_state_sub_map_lock: threading.Lock _mips_sub_pending_map: dict[str, int] - _mips_sub_pending_timer: Optional[TimeoutHandle] - - _on_mips_cmd: Callable[[MipsCmd], None] - _on_mips_message: Callable[[str, bytes], None] - _on_mips_connect: Callable[[int, dict], None] - _on_mips_disconnect: Callable[[int, dict], None] + _mips_sub_pending_timer: Optional[asyncio.TimerHandle] def __init__( - self, client_id: str, host: str, port: int, - username: str = None, password: str = None, - ca_file: str = None, cert_file: str = None, key_file: str = None, + self, + client_id: str, + host: str, + port: int, + username: Optional[str] = None, + password: Optional[str] = None, + ca_file: Optional[str] = None, + cert_file: Optional[str] = None, + key_file: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: # MUST run with running loop @@ -359,6 +272,7 @@ def __init__( self._ca_file = ca_file self._cert_file = cert_file self._key_file = key_file + self._tls_done = False self._mqtt_logger = None self._mqtt_fd = -1 @@ -372,26 +286,15 @@ def __init__( # Mips init self._event_connect = asyncio.Event() self._event_disconnect = asyncio.Event() + self._mips_thread = None self._mips_reconnect_tag = False self._mips_reconnect_interval = 0 self._mips_reconnect_timer = None self._mips_state_sub_map = {} + self._mips_state_sub_map_lock = threading.Lock() self._mips_sub_pending_map = {} self._mips_sub_pending_timer = None - self._mev = MIoTEventLoop() - self._mips_queue = queue.Queue() - self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) - self.mev_set_read_handler( - self._cmd_event_fd, self.__mips_cmd_read_handler, None) - self._mips_thread = threading.Thread(target=self.__mips_loop_thread) - self._mips_thread.daemon = True - self._mips_thread.name = self._client_id - self._mips_thread.start() - - self._on_mips_cmd = None - self._on_mips_message = None - self._on_mips_connect = None - self._on_mips_disconnect = None + # DO NOT start the thread yet. Do that on connect @property def client_id(self) -> str: @@ -415,29 +318,54 @@ def mips_state(self) -> bool: """ return self._mqtt and self._mqtt.is_connected() - @final - def mips_deinit(self) -> None: - self._mips_send_cmd(type_=MipsCmdType.DEINIT, data=None) + def connect(self, thread_name: Optional[str] = None) -> None: + """mips connect.""" + # Start mips thread + if self._mips_thread: + return + self._internal_loop = asyncio.new_event_loop() + self._mips_thread = threading.Thread(target=self.__mips_loop_thread) + self._mips_thread.daemon = True + self._mips_thread.name = ( + self._client_id if thread_name is None else thread_name) + self._mips_thread.start() + + async def connect_async(self) -> None: + """mips connect async.""" + self.connect() + await self._event_connect.wait() + + def disconnect(self) -> None: + """mips disconnect.""" + if not self._mips_thread: + return + self._internal_loop.call_soon_threadsafe(self.__mips_disconnect) self._mips_thread.join() self._mips_thread = None + self._internal_loop.close() + + async def disconnect_async(self) -> None: + """mips disconnect async.""" + self.disconnect() + await self._event_disconnect.wait() + + @final + def deinit(self) -> None: + self.disconnect() self._logger = None - self._client_id = None - self._host = None - self._port = None self._username = None self._password = None self._ca_file = None self._cert_file = None self._key_file = None + self._tls_done = False self._mqtt_logger = None - self._mips_state_sub_map = None - self._mips_sub_pending_map = None + with self._mips_state_sub_map_lock: + self._mips_state_sub_map.clear() + self._mips_sub_pending_map.clear() self._mips_sub_pending_timer = None - self._event_connect = None - self._event_disconnect = None - def update_mqtt_password(self, password: str) -> None: self._password = password self._mqtt.username_pw_set( @@ -466,166 +394,74 @@ def enable_mqtt_logger( else: self._mqtt.disable_logger() - @final - def mips_connect(self) -> None: - """mips connect.""" - return self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) - - @final - async def mips_connect_async(self) -> None: - """mips connect async.""" - self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) - return await self._event_connect.wait() - - @final - def mips_disconnect(self) -> None: - """mips disconnect.""" - return self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) - - @final - async def mips_disconnect_async(self) -> None: - """mips disconnect async.""" - self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) - return await self._event_disconnect.wait() - @final def sub_mips_state( - self, key: str, handler: Callable[[str, bool], asyncio.Future] + self, key: str, handler: Callable[[str, bool], Coroutine] ) -> bool: """Subscribe mips state. NOTICE: callback to main loop thread + This will be called before the client is connected. + So use mutex instead of IPC. """ if isinstance(key, str) is False or handler is None: raise MIoTMipsError('invalid params') - return self._mips_send_cmd( - type_=MipsCmdType.REG_MIPS_STATE, - data=MipsRegState(key=key, handler=handler)) + state = _MipsState(key=key, handler=handler) + with self._mips_state_sub_map_lock: + self._mips_state_sub_map[key] = state + self.log_debug(f'mips register mips state, {key}') + return True @final def unsub_mips_state(self, key: str) -> bool: """Unsubscribe mips state.""" if isinstance(key, str) is False: raise MIoTMipsError('invalid params') - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_MIPS_STATE, data=MipsRegState(key=key)) - - @final - def mev_set_timeout( - self, timeout_ms: int, handler: Callable[[Any], None], - handler_ctx: Any = None - ) -> Optional[TimeoutHandle]: - """set timeout. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return None - return self._mev.set_timeout( - timeout_ms=timeout_ms, handler=handler, handler_ctx=handler_ctx) - - @final - def mev_clear_timeout(self, handle: TimeoutHandle) -> None: - """clear timeout. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return - self._mev.clear_timeout(handle) - - @final - def mev_set_read_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any - ) -> bool: - """set read handler. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return False - return self._mev.set_read_handler( - fd=fd, handler=handler, handler_ctx=handler_ctx) - - @final - def mev_set_write_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any - ) -> bool: - """set write handler. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return False - return self._mev.set_write_handler( - fd=fd, handler=handler, handler_ctx=handler_ctx) - - @property - def on_mips_cmd(self) -> Callable[[MipsCmd], None]: - return self._on_mips_cmd - - @on_mips_cmd.setter - def on_mips_cmd(self, handler: Callable[[MipsCmd], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the **mips** thread - """ - self._on_mips_cmd = handler - - @property - def on_mips_message(self) -> Callable[[str, bytes], None]: - return self._on_mips_message - - @on_mips_message.setter - def on_mips_message(self, handler: Callable[[str, bytes], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the **mips** thread - """ - self._on_mips_message = handler - - @property - def on_mips_connect(self) -> Callable[[int, dict], None]: - return self._on_mips_connect - - @on_mips_connect.setter - def on_mips_connect(self, handler: Callable[[int, dict], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the - **main loop** thread - """ - self._on_mips_connect = handler - - @property - def on_mips_disconnect(self) -> Callable[[int, dict], None]: - return self._on_mips_disconnect - - @on_mips_disconnect.setter - def on_mips_disconnect(self, handler: Callable[[int, dict], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the - **main loop** thread - """ - self._on_mips_disconnect = handler + with self._mips_state_sub_map_lock: + del self._mips_state_sub_map[key] + self.log_debug(f'mips unregister mips state, {key}') + return True @abstractmethod def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_prop( - self, did: str, siid: int = None, piid: int = None + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None ) -> bool: ... @abstractmethod def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_event( - self, did: str, siid: int = None, eiid: int = None + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None ) -> bool: ... @abstractmethod async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, + payload: Optional[str] = None, + timeout_ms: int = 10000 ) -> dict[str, dict]: ... @abstractmethod @@ -637,13 +473,22 @@ async def get_prop_async( async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 - ) -> bool: ... + ) -> dict: ... @abstractmethod async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 - ) -> tuple[bool, list]: ... + ) -> dict: ... + + @abstractmethod + def _on_mips_message(self, topic: str, payload: bytes) -> None: ... + + @abstractmethod + def _on_mips_connect(self, rc: int, props: dict) -> None: ... + + @abstractmethod + def _on_mips_disconnect(self, rc: int, props: dict) -> None: ... @final def _mips_sub_internal(self, topic: str) -> None: @@ -657,8 +502,8 @@ def _mips_sub_internal(self, topic: str) -> None: if topic not in self._mips_sub_pending_map: self._mips_sub_pending_map[topic] = 0 if not self._mips_sub_pending_timer: - self._mips_sub_pending_timer = self.mev_set_timeout( - 10, self.__mips_sub_internal_pending_handler, topic) + self._mips_sub_pending_timer = self._internal_loop.call_later( + 0.01, self.__mips_sub_internal_pending_handler, topic) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips sub internal error, {topic}. {err}') @@ -707,75 +552,24 @@ def _mips_publish_internal( self.log_error(f'mips publish internal error, {err}') return False - @final - def _mips_send_cmd(self, type_: MipsCmdType, data: Any) -> bool: - if self._mips_queue is None or self._cmd_event_fd is None: - raise MIoTMipsError('send mips cmd disable') - # Put data to queue - self._mips_queue.put(MipsCmd(type_=type_, data=data)) - # Write event fd - os.eventfd_write(self._cmd_event_fd, 1) - # self.log_debug(f'send mips cmd, {type}, {data}') - return True - def __thread_check(self) -> None: if threading.current_thread() is not self._mips_thread: raise MIoTMipsError('illegal call') - def __mips_cmd_read_handler(self, ctx: Any) -> None: - fd_value = os.eventfd_read(self._cmd_event_fd) - if fd_value == 0: - return - while self._mips_queue.empty() is False: - mips_cmd: MipsCmd = self._mips_queue.get(block=False) - if mips_cmd.type_ == MipsCmdType.CONNECT: - self._mips_reconnect_tag = True - self.__mips_try_reconnect(immediately=True) - elif mips_cmd.type_ == MipsCmdType.DISCONNECT: - self._mips_reconnect_tag = False - self.__mips_disconnect() - elif mips_cmd.type_ == MipsCmdType.DEINIT: - self.log_info('mips client recv deinit cmd') - self.__mips_disconnect() - # Close cmd event fd - if self._cmd_event_fd: - self.mev_set_read_handler( - self._cmd_event_fd, None, None) - os.close(self._cmd_event_fd) - self._cmd_event_fd = None - if self._mips_queue: - self._mips_queue = None - # ev loop stop - if self._mev: - self._mev.loop_stop() - self._mev = None - break - elif mips_cmd.type_ == MipsCmdType.REG_MIPS_STATE: - state: MipsState = mips_cmd.data - self._mips_state_sub_map[state.key] = state - self.log_debug(f'mips register mips state, {state.key}') - elif mips_cmd.type_ == MipsCmdType.UNREG_MIPS_STATE: - state: MipsState = mips_cmd.data - del self._mips_state_sub_map[state.key] - self.log_debug(f'mips unregister mips state, {state.key}') - else: - if self._on_mips_cmd: - self._on_mips_cmd(mips_cmd=mips_cmd) - - def __mqtt_read_handler(self, ctx: Any) -> None: - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_read_handler(self) -> None: + self.__mqtt_loop_handler() - def __mqtt_write_handler(self, ctx: Any) -> None: - self.mev_set_write_handler(self._mqtt_fd, None, None) - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_write_handler(self) -> None: + self._internal_loop.remove_writer(self._mqtt_fd) + self.__mqtt_loop_handler() - def __mqtt_timer_handler(self, ctx: Any) -> None: - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_timer_handler(self) -> None: + self.__mqtt_loop_handler() if self._mqtt: - self._mqtt_timer = self.mev_set_timeout( - self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) + self._mqtt_timer = self._internal_loop.call_later( + self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) - def __mqtt_loop_handler(self, ctx: Any) -> None: + def __mqtt_loop_handler(self) -> None: try: if self._mqtt: self._mqtt.loop_read() @@ -784,8 +578,8 @@ def __mqtt_loop_handler(self, ctx: Any) -> None: if self._mqtt: self._mqtt.loop_misc() if self._mqtt and self._mqtt.want_write(): - self.mev_set_write_handler( - self._mqtt_fd, self.__mqtt_write_handler, None) + self._internal_loop.add_writer( + self._mqtt_fd, self.__mqtt_write_handler) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'__mqtt_loop_handler, {err}') @@ -797,25 +591,29 @@ def __mips_loop_thread(self) -> None: if self._username: self._mqtt.username_pw_set( username=self._username, password=self._password) - if ( - self._ca_file - and self._cert_file - and self._key_file - ): - self._mqtt.tls_set( - tls_version=ssl.PROTOCOL_TLS_CLIENT, - ca_certs=self._ca_file, - certfile=self._cert_file, - keyfile=self._key_file) - else: - self._mqtt.tls_set(tls_version=ssl.PROTOCOL_TLS_CLIENT) - self._mqtt.tls_insecure_set(True) + if not self._tls_done: + if ( + self._ca_file + and self._cert_file + and self._key_file + ): + self._mqtt.tls_set( + tls_version=ssl.PROTOCOL_TLS_CLIENT, + ca_certs=self._ca_file, + certfile=self._cert_file, + keyfile=self._key_file) + else: + self._mqtt.tls_set(tls_version=ssl.PROTOCOL_TLS_CLIENT) + self._mqtt.tls_insecure_set(True) + self._tls_done = True self._mqtt.on_connect = self.__on_connect self._mqtt.on_connect_fail = self.__on_connect_failed self._mqtt.on_disconnect = self.__on_disconnect self._mqtt.on_message = self.__on_message + # Connect to mips + self.__mips_start_connect_tries() # Run event loop - self._mev.loop_forever() + self._internal_loop.run_forever() self.log_info('mips_loop_thread exit!') def __on_connect(self, client, user_data, flags, rc, props) -> None: @@ -823,23 +621,23 @@ def __on_connect(self, client, user_data, flags, rc, props) -> None: return self.log_info(f'mips connect, {flags}, {rc}, {props}') self._mqtt_state = True - if self._on_mips_connect: - self.mev_set_timeout( - timeout_ms=0, - handler=lambda ctx: - self._on_mips_connect(rc, props)) - for item in self._mips_state_sub_map.values(): - if item.handler is None: - continue - self.main_loop.call_soon_threadsafe( - self.main_loop.create_task, - item.handler(item.key, True)) + self._internal_loop.call_soon( + self._on_mips_connect, rc, props) + with self._mips_state_sub_map_lock: + for item in self._mips_state_sub_map.values(): + if item.handler is None: + continue + self.main_loop.call_soon_threadsafe( + self.main_loop.create_task, + item.handler(item.key, True)) # Resolve future - self._event_connect.set() - self._event_disconnect.clear() + self.main_loop.call_soon_threadsafe( + self._event_connect.set) + self.main_loop.call_soon_threadsafe( + self._event_disconnect.clear) - def __on_connect_failed(self, client, user_data, flags, rc) -> None: - self.log_error(f'mips connect failed, {flags}, {rc}') + def __on_connect_failed(self, client: Client, user_data: Any) -> None: + self.log_error('mips connect failed') # Try to reconnect self.__mips_try_reconnect() @@ -848,53 +646,44 @@ def __on_disconnect(self, client, user_data, rc, props) -> None: self.log_error(f'mips disconnect, {rc}, {props}') self._mqtt_state = False if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 # Clear retry sub if self._mips_sub_pending_timer: - self.mev_clear_timeout(self._mips_sub_pending_timer) + self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} - if self._on_mips_disconnect: - self.mev_set_timeout( - timeout_ms=0, - handler=lambda ctx: - self._on_mips_disconnect(rc, props)) + self._internal_loop.call_soon( + self._on_mips_disconnect, rc, props) # Call state sub handler - for item in self._mips_state_sub_map.values(): - if item.handler is None: - continue - self.main_loop.call_soon_threadsafe( - self.main_loop.create_task, - item.handler(item.key, False)) + with self._mips_state_sub_map_lock: + for item in self._mips_state_sub_map.values(): + if item.handler is None: + continue + self.main_loop.call_soon_threadsafe( + self.main_loop.create_task, + item.handler(item.key, False)) # Try to reconnect self.__mips_try_reconnect() # Set event - self._event_disconnect.set() - self._event_connect.clear() - - def __on_message(self, client, user_data, msg) -> None: + self.main_loop.call_soon_threadsafe( + self._event_disconnect.set) + self.main_loop.call_soon_threadsafe( + self._event_connect.clear) + + def __on_message( + self, + client: Client, + user_data: Any, + msg: MQTTMessage + ) -> None: self._on_mips_message(topic=msg.topic, payload=msg.payload) - def __mips_try_reconnect(self, immediately: bool = False) -> None: - if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) - self._mips_reconnect_timer = None - if not self._mips_reconnect_tag: - return - interval: int = 0 - if not immediately: - interval = self.__get_next_reconnect_time() - self.log_error( - 'mips try reconnect after %sms', interval) - self._mips_reconnect_timer = self.mev_set_timeout( - interval, self.__mips_connect, None) - def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: subbed_count = 1 for topic in list(self._mips_sub_pending_map.keys()): @@ -916,25 +705,25 @@ def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: f'retry mips sub internal, {count}, {topic}, {result}, {mid}') if len(self._mips_sub_pending_map): - self._mips_sub_pending_timer = self.mev_set_timeout( + self._mips_sub_pending_timer = self._internal_loop.call_later( self.MIPS_SUB_INTERVAL, self.__mips_sub_internal_pending_handler, None) else: self._mips_sub_pending_timer = None - def __mips_connect(self, ctx: Any = None) -> None: + def __mips_connect(self) -> None: result = MQTT_ERR_UNKNOWN if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) + self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None try: # Try clean mqtt fd before mqtt connect if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 result = self._mqtt.connect( host=self._host, port=self._port, @@ -944,33 +733,59 @@ def __mips_connect(self, ctx: Any = None) -> None: self.log_error('__mips_connect, connect error, %s', error) if result == MQTT_ERR_SUCCESS: - self._mqtt_fd = self._mqtt.socket() + socket = self._mqtt.socket() + if socket is None: + self.log_error( + '__mips_connect, connect success, but socket is None') + self.__mips_try_reconnect() + return + self._mqtt_fd = socket.fileno() self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}') - self.mev_set_read_handler( - self._mqtt_fd, self.__mqtt_read_handler, None) + self._internal_loop.add_reader( + self._mqtt_fd, self.__mqtt_read_handler) if self._mqtt.want_write(): - self.mev_set_write_handler( - self._mqtt_fd, self.__mqtt_write_handler, None) - self._mqtt_timer = self.mev_set_timeout( - self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) + self._internal_loop.add_writer( + self._mqtt_fd, self.__mqtt_write_handler) + self._mqtt_timer = self._internal_loop.call_later( + self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) else: self.log_error(f'__mips_connect error result, {result}') self.__mips_try_reconnect() + def __mips_try_reconnect(self, immediately: bool = False) -> None: + if self._mips_reconnect_timer: + self._mips_reconnect_timer.cancel() + self._mips_reconnect_timer = None + if not self._mips_reconnect_tag: + return + interval: float = 0 + if not immediately: + interval = self.__get_next_reconnect_time() + self.log_error( + 'mips try reconnect after %ss', interval) + self._mips_reconnect_timer = self._internal_loop.call_later( + interval, self.__mips_connect) + + def __mips_start_connect_tries(self) -> None: + self._mips_reconnect_tag = True + self.__mips_try_reconnect(immediately=True) + def __mips_disconnect(self) -> None: + self._mips_reconnect_tag = False if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) + self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 self._mqtt.disconnect() + self._internal_loop.stop() - def __get_next_reconnect_time(self) -> int: + def __get_next_reconnect_time(self) -> float: if self._mips_reconnect_interval == 0: self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN else: @@ -980,7 +795,7 @@ def __get_next_reconnect_time(self) -> int: return self._mips_reconnect_interval -class MipsCloudClient(MipsClient): +class MipsCloudClient(_MipsClient): """MIoT Pub/Sub Cloud Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes @@ -996,45 +811,25 @@ def __init__( client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com', port=port, username=app_id, password=token, loop=loop) - self.on_mips_cmd = self.__on_mips_cmd_handler - self.on_mips_message = self.__on_mips_message_handler - self.on_mips_connect = self.__on_mips_connect_handler - self.on_mips_disconnect = self.__on_mips_disconnect_handler - - def deinit(self) -> None: - self.mips_deinit() - self._msg_matcher = None - self.on_mips_cmd = None - self.on_mips_message = None - self.on_mips_connect = None - - @final - def connect(self) -> None: - self.mips_connect() - - @final - async def connect_async(self) -> None: - await self.mips_connect_async() - @final def disconnect(self) -> None: - self.mips_disconnect() - self._msg_matcher = MIoTMatcher() - - @final - async def disconnect_async(self) -> None: - await self.mips_disconnect_async() + super().disconnect() self._msg_matcher = MIoTMatcher() def update_access_token(self, access_token: str) -> bool: if not isinstance(access_token, str): raise MIoTMipsError('invalid token') - return self.update_mqtt_password(password=access_token) + self.update_mqtt_password(password=access_token) + return True @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') @@ -1043,7 +838,7 @@ def sub_prop( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - def on_prop_msg(topic: str, payload: str, ctx: Any) -> bool: + def on_prop_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: @@ -1062,22 +857,31 @@ def on_prop_msg(topic: str, payload: str, ctx: Any) -> bool: if handler: self.log_debug('on properties_changed, %s', payload) handler(msg['params'], ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') @@ -1086,7 +890,7 @@ def sub_event( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - def on_event_msg(topic: str, payload: str, ctx: Any) -> bool: + def on_event_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: @@ -1106,18 +910,23 @@ def on_event_msg(topic: str, payload: str, ctx: Any) -> bool: self.log_debug('on on_event_msg, %s', payload) msg['params']['from'] = 'cloud' handler(msg['params'], ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_device_state( @@ -1145,7 +954,7 @@ def on_state_msg(topic: str, payload: str, ctx: Any) -> None: handler( did, MIoTDeviceState.ONLINE if msg['event'] == 'online' else MIoTDeviceState.OFFLINE, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_state_msg, handler_ctx=handler_ctx) @final @@ -1153,10 +962,10 @@ def unsub_device_state(self, did: str) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: raise NotImplementedError('please call in http client') @@ -1168,97 +977,95 @@ async def get_prop_async( async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 - ) -> bool: + ) -> dict: raise NotImplementedError('please call in http client') async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 - ) -> tuple[bool, list]: + ) -> dict: raise NotImplementedError('please call in http client') - def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: - """ - NOTICE thread safe, this function will be called at the **mips** thread - """ - if mips_cmd.type_ == MipsCmdType.REG_BROADCAST: - reg_bc: MipsRegBroadcast = mips_cmd.data - if not self._msg_matcher.get(topic=reg_bc.topic): - sub_bc: MipsBroadcast = MipsBroadcast( - topic=reg_bc.topic, handler=reg_bc.handler, - handler_ctx=reg_bc.handler_ctx) - self._msg_matcher[reg_bc.topic] = sub_bc - self._mips_sub_internal(topic=reg_bc.topic) - else: - self.log_debug(f'mips cloud re-reg broadcast, {reg_bc.topic}') - elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: - unreg_bc: MipsRegBroadcast = mips_cmd.data - if self._msg_matcher.get(topic=unreg_bc.topic): - del self._msg_matcher[unreg_bc.topic] - self._mips_unsub_internal(topic=unreg_bc.topic) + def __reg_broadcast_external( + self, topic: str, handler: Callable[[str, str, Any], None], + handler_ctx: Any = None + ) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__reg_broadcast, topic, handler, handler_ctx) + return True + + def __unreg_broadcast_external(self, topic: str) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__unreg_broadcast, topic) + return True def __reg_broadcast( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any = None - ) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.REG_BROADCAST, - data=MipsRegBroadcast( - topic=topic, handler=handler, handler_ctx=handler_ctx)) + ) -> None: + if not self._msg_matcher.get(topic=topic): + sub_bc: _MipsBroadcast = _MipsBroadcast( + topic=topic, handler=handler, + handler_ctx=handler_ctx) + self._msg_matcher[topic] = sub_bc + self._mips_sub_internal(topic=topic) + else: + self.log_debug(f'mips cloud re-reg broadcast, {topic}') - def __unreg_broadcast(self, topic: str) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_BROADCAST, - data=MipsRegBroadcast(topic=topic)) + def __unreg_broadcast(self, topic: str) -> None: + if self._msg_matcher.get(topic=topic): + del self._msg_matcher[topic] + self._mips_unsub_internal(topic=topic) - def __on_mips_connect_handler(self, rc, props) -> None: + def _on_mips_connect(self, rc: int, props: dict) -> None: """sub topic.""" for topic, _ in list( self._msg_matcher.iter_all_nodes()): self._mips_sub_internal(topic=topic) - def __on_mips_disconnect_handler(self, rc, props) -> None: + def _on_mips_disconnect(self, rc: int, props: dict) -> None: """unsub topic.""" pass - def __on_mips_message_handler(self, topic: str, payload) -> None: + def _on_mips_message(self, topic: str, payload: bytes) -> None: """ NOTICE thread safe, this function will be called at the **mips** thread """ # broadcast - bc_list: list[MipsBroadcast] = list( + bc_list: list[_MipsBroadcast] = list( self._msg_matcher.iter_match(topic)) if not bc_list: return + # The message from the cloud is not packed. + payload_str: str = payload.decode('utf-8') # self.log_debug(f"on broadcast, {topic}, {payload}") for item in bc_list or []: if item.handler is None: continue # NOTICE: call threadsafe self.main_loop.call_soon_threadsafe( - item.handler, topic, payload, item.handler_ctx) + item.handler, topic, payload_str, item.handler_ctx) -class MipsLocalClient(MipsClient): +class MipsLocalClient(_MipsClient): """MIoT Pub/Sub Local Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes - MIPS_RECONNECT_INTERVAL_MIN: int = 6000 - MIPS_RECONNECT_INTERVAL_MAX: int = 60000 + MIPS_RECONNECT_INTERVAL_MIN: float = 6 + MIPS_RECONNECT_INTERVAL_MAX: float = 60 MIPS_SUB_PATCH: int = 1000 - MIPS_SUB_INTERVAL: int = 100 + MIPS_SUB_INTERVAL: float = 0.1 _did: str _group_id: str _home_name: str _mips_seed_id: int _reply_topic: str _dev_list_change_topic: str - _request_map: dict[str, MipsRequest] + _request_map: dict[str, _MipsRequest] _msg_matcher: MIoTMatcher - _device_state_sub_map: dict[str, MipsDeviceState] _get_prop_queue: dict[str, list] - _get_prop_timer: asyncio.TimerHandle - _on_dev_list_changed: Callable[[Any, list[str]], asyncio.Future] + _get_prop_timer: Optional[asyncio.TimerHandle] + _on_dev_list_changed: Optional[Callable[[Any, list[str]], Coroutine]] def __init__( self, did: str, host: str, group_id: str, @@ -1274,7 +1081,6 @@ def __init__( self._dev_list_change_topic = f'{did}/appMsg/devListChange' self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} self._get_prop_queue = {} self._get_prop_timer = None self._on_dev_list_changed = None @@ -1282,34 +1088,11 @@ def __init__( super().__init__( client_id=did, host=host, port=port, ca_file=ca_file, cert_file=cert_file, key_file=key_file, loop=loop) - # MIPS local thread name use group_id - self._mips_thread.name = self._group_id - - self.on_mips_cmd = self.__on_mips_cmd_handler - self.on_mips_message = self.__on_mips_message_handler - self.on_mips_connect = self.__on_mips_connect_handler @property def group_id(self) -> str: return self._group_id - def deinit(self) -> None: - self.mips_deinit() - self._did = None - self._mips_seed_id = None - self._reply_topic = None - self._dev_list_change_topic = None - self._request_map = None - self._msg_matcher = None - self._device_state_sub_map = None - self._get_prop_queue = None - self._get_prop_timer = None - self._on_dev_list_changed = None - - self.on_mips_cmd = None - self.on_mips_message = None - self.on_mips_connect = None - def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs) @@ -1323,31 +1106,24 @@ def log_error(self, msg, *args, **kwargs) -> None: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) @final - def connect(self) -> None: - self.mips_connect() - - @final - async def connect_async(self) -> None: - await self.mips_connect_async() + def connect(self, thread_name: Optional[str] = None) -> None: + # MIPS local thread name use group_id + super().connect(self._group_id) @final def disconnect(self) -> None: - self.mips_disconnect() - self._request_map = {} - self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} - - @final - async def disconnect_async(self) -> None: - await self.mips_disconnect_async() + super().disconnect() self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' @@ -1367,20 +1143,29 @@ def on_prop_msg(topic: str, payload: str, ctx: Any): if handler: self.log_debug('local, on properties_changed, %s', payload) handler(msg, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' @@ -1400,15 +1185,20 @@ def on_event_msg(topic: str, payload: str, ctx: Any): if handler: self.log_debug('local, on event_occurred, %s', payload) handler(msg, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final async def get_prop_safe_async( @@ -1426,7 +1216,9 @@ async def get_prop_safe_async( 'timeout_ms': timeout_ms }) if self._get_prop_timer is None: - self._get_prop_timer = self.main_loop.create_task( + self._get_prop_timer = self.main_loop.call_later( + 0.1, + self.main_loop.create_task, self.__get_prop_timer_handle()) return await fut @@ -1515,13 +1307,13 @@ async def action_async( @final async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: result_obj = await self.__request_async( topic='proxy/getDevList', payload=payload or '{}', timeout_ms=timeout_ms) if not result_obj or 'devList' not in result_obj: - return None + raise MIoTMipsError('invalid result') device_list = {} for did, info in result_obj['devList'].items(): name: str = info.get('name', None) @@ -1557,7 +1349,7 @@ async def get_action_group_list_async( payload='{}', timeout_ms=timeout_ms) if not result_obj or 'result' not in result_obj: - return None + raise MIoTMipsError('invalid result') return result_obj['result'] @final @@ -1579,79 +1371,73 @@ async def exec_action_group_list_async( @final @property - def on_dev_list_changed(self) -> Callable[[Any, list[str]], asyncio.Future]: + def on_dev_list_changed( + self + ) -> Optional[Callable[[Any, list[str]], Coroutine]]: return self._on_dev_list_changed @final @on_dev_list_changed.setter def on_dev_list_changed( - self, func: Callable[[Any, list[str]], asyncio.Future] + self, func: Callable[[Any, list[str]], Coroutine] ) -> None: """run in main loop.""" self._on_dev_list_changed = func - @final - def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: - if mips_cmd.type_ == MipsCmdType.CALL_API: - req_data: MipsRequestData = mips_cmd.data - req = MipsRequest() - req.mid = self.__gen_mips_id - req.on_reply = req_data.on_reply - req.on_reply_ctx = req_data.on_reply_ctx - pub_topic: str = f'master/{req_data.topic}' - result = self.__mips_publish( - topic=pub_topic, payload=req_data.payload, mid=req.mid, - ret_topic=self._reply_topic) - self.log_debug( - f'mips local call api, {result}, {req.mid}, {pub_topic}, ' - f'{req_data.payload}') - - def on_request_timeout(req: MipsRequest): - self.log_error( - f'on mips request timeout, {req.mid}, {pub_topic}' - f', {req_data.payload}') - self._request_map.pop(str(req.mid), None) - req.on_reply( - '{"error":{"code":-10006, "message":"timeout"}}', - req.on_reply_ctx) - req.timer = self.mev_set_timeout( - req_data.timeout_ms, on_request_timeout, req) - self._request_map[str(req.mid)] = req - elif mips_cmd.type_ == MipsCmdType.REG_BROADCAST: - reg_bc: MipsRegBroadcast = mips_cmd.data - sub_topic: str = f'{self._did}/{reg_bc.topic}' - if not self._msg_matcher.get(sub_topic): - sub_bc: MipsBroadcast = MipsBroadcast( - topic=sub_topic, handler=reg_bc.handler, - handler_ctx=reg_bc.handler_ctx) - self._msg_matcher[sub_topic] = sub_bc - self._mips_sub_internal(topic=f'master/{reg_bc.topic}') - else: - self.log_debug(f'mips re-reg broadcast, {sub_topic}') - elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: - unreg_bc: MipsRegBroadcast = mips_cmd.data - # Central hub gateway needs to add prefix - unsub_topic: str = f'{self._did}/{unreg_bc.topic}' - if self._msg_matcher.get(unsub_topic): - del self._msg_matcher[unsub_topic] - self._mips_unsub_internal( - topic=re.sub(f'^{self._did}', 'master', unsub_topic)) - elif mips_cmd.type_ == MipsCmdType.REG_DEVICE_STATE: - reg_dev_state: MipsRegDeviceState = mips_cmd.data - self._device_state_sub_map[reg_dev_state.did] = reg_dev_state - self.log_debug( - f'mips local reg device state, {reg_dev_state.did}') - elif mips_cmd.type_ == MipsCmdType.UNREG_DEVICE_STATE: - unreg_dev_state: MipsRegDeviceState = mips_cmd.data - del self._device_state_sub_map[unreg_dev_state.did] - self.log_debug( - f'mips local unreg device state, {unreg_dev_state.did}') - else: + def __request( + self, topic: str, payload: str, + on_reply: Callable[[str, Any], None], + on_reply_ctx: Any = None, timeout_ms: int = 10000 + ) -> None: + req = _MipsRequest( + mid=self.__gen_mips_id, + on_reply=on_reply, + on_reply_ctx=on_reply_ctx, + timer=None) + pub_topic: str = f'master/{topic}' + result = self.__mips_publish( + topic=pub_topic, payload=payload, mid=req.mid, + ret_topic=self._reply_topic) + self.log_debug( + f'mips local call api, {result}, {req.mid}, {pub_topic}, ' + f'{payload}') + + def on_request_timeout(req: _MipsRequest): self.log_error( - f'mips local recv unknown cmd, {mips_cmd.type_}, ' - f'{mips_cmd.data}') + f'on mips request timeout, {req.mid}, {pub_topic}' + f', {payload}') + self._request_map.pop(str(req.mid), None) + req.on_reply( + '{"error":{"code":-10006, "message":"timeout"}}', + req.on_reply_ctx) + req.timer = self._internal_loop.call_later( + timeout_ms/1000, on_request_timeout, req) + self._request_map[str(req.mid)] = req - def __on_mips_connect_handler(self, rc, props) -> None: + def __reg_broadcast( + self, topic: str, handler: Callable[[str, str, Any], None], + handler_ctx: Any + ) -> None: + sub_topic: str = f'{self._did}/{topic}' + if not self._msg_matcher.get(sub_topic): + sub_bc: _MipsBroadcast = _MipsBroadcast( + topic=sub_topic, handler=handler, + handler_ctx=handler_ctx) + self._msg_matcher[sub_topic] = sub_bc + self._mips_sub_internal(topic=f'master/{topic}') + else: + self.log_debug(f'mips re-reg broadcast, {sub_topic}') + + def __unreg_broadcast(self, topic) -> None: + # Central hub gateway needs to add prefix + unsub_topic: str = f'{self._did}/{topic}' + if self._msg_matcher.get(unsub_topic): + del self._msg_matcher[unsub_topic] + self._mips_unsub_internal( + topic=re.sub(f'^{self._did}', 'master', unsub_topic)) + + @final + def _on_mips_connect(self, rc: int, props: dict) -> None: self.log_debug('__on_mips_connect_handler') # Sub did/#, include reply topic self._mips_sub_internal(f'{self._did}/#') @@ -1665,24 +1451,30 @@ def __on_mips_connect_handler(self, rc, props) -> None: topic=re.sub(f'^{self._did}', 'master', topic)) @final - def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: - mips_msg: MipsMessage = MipsMessage.unpack(payload) + def _on_mips_disconnect(self, rc: int, props: dict) -> None: + pass + + @final + def _on_mips_message(self, topic: str, payload: bytes) -> None: + mips_msg: _MipsMessage = _MipsMessage.unpack(payload) # self.log_debug( # f"mips local client, on_message, {topic} -> {mips_msg}") # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') - req: MipsRequest = self._request_map.pop(str(mips_msg.mid), None) + req: Optional[_MipsRequest] = self._request_map.pop( + str(mips_msg.mid), None) if req: # Cancel timer - self.mev_clear_timeout(req.timer) + if req.timer: + req.timer.cancel() if req.on_reply: self.main_loop.call_soon_threadsafe( req.on_reply, mips_msg.payload or '{}', req.on_reply_ctx) return # Broadcast - bc_list: list[MipsBroadcast] = list(self._msg_matcher.iter_match( + bc_list: list[_MipsBroadcast] = list(self._msg_matcher.iter_match( topic=topic)) if bc_list: self.log_debug(f'on broadcast, {topic}, {mips_msg}') @@ -1695,6 +1487,9 @@ def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: return # Device list change if topic == self._dev_list_change_topic: + if mips_msg.payload is None: + self.log_error('devListChange msg is None') + return payload_obj: dict = json.loads(mips_msg.payload) dev_list = payload_obj.get('devList', None) if not isinstance(dev_list, list) or not dev_list: @@ -1704,7 +1499,7 @@ def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: if self._on_dev_list_changed: self.main_loop.call_soon_threadsafe( self.main_loop.create_task, - self._on_dev_list_changed(self, payload_obj['devList'])) + self._on_dev_list_changed(self, dev_list)) return self.log_debug( @@ -1717,45 +1512,45 @@ def __gen_mips_id(self) -> int: return mips_id def __mips_publish( - self, topic: str, payload: str | bytes, mid: int = None, - ret_topic: str = None, wait_for_publish: bool = False, - timeout_ms: int = 10000 + self, + topic: str, + payload: str, + mid: Optional[int] = None, + ret_topic: Optional[str] = None, + wait_for_publish: bool = False, + timeout_ms: int = 10000 ) -> bool: - mips_msg: bytes = MipsMessage.pack( + mips_msg: bytes = _MipsMessage.pack( mid=mid or self.__gen_mips_id, payload=payload, msg_from='local', ret_topic=ret_topic) return self._mips_publish_internal( topic=topic.strip(), payload=mips_msg, wait_for_publish=wait_for_publish, timeout_ms=timeout_ms) - def __request( + def __request_external( self, topic: str, payload: str, on_reply: Callable[[str, Any], None], on_reply_ctx: Any = None, timeout_ms: int = 10000 ) -> bool: if topic is None or payload is None or on_reply is None: raise MIoTMipsError('invalid params') - req_data: MipsRequestData = MipsRequestData() - req_data.topic = topic - req_data.payload = payload - req_data.on_reply = on_reply - req_data.on_reply_ctx = on_reply_ctx - req_data.timeout_ms = timeout_ms - return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data) + self._internal_loop.call_soon_threadsafe( + self.__request, topic, payload, on_reply, on_reply_ctx, timeout_ms) + return True - def __reg_broadcast( + def __reg_broadcast_external( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any ) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.REG_BROADCAST, - data=MipsRegBroadcast( - topic=topic, handler=handler, handler_ctx=handler_ctx)) + self._internal_loop.call_soon_threadsafe( + self.__reg_broadcast, + topic, handler, handler_ctx) + return True - def __unreg_broadcast(self, topic) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_BROADCAST, - data=MipsRegBroadcast(topic=topic)) + def __unreg_broadcast_external(self, topic) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__unreg_broadcast, topic) + return True @final async def __request_async( @@ -1767,7 +1562,7 @@ def on_msg_reply(payload: str, ctx: Any): fut: asyncio.Future = ctx if fut: self.main_loop.call_soon_threadsafe(fut.set_result, payload) - if not self.__request( + if not self.__request_external( topic=topic, payload=payload, on_reply=on_msg_reply, diff --git a/test/conftest.py b/test/conftest.py index 92634029..64687f76 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -20,7 +20,6 @@ def load_py_file(): 'const.py', 'miot_cloud.py', 'miot_error.py', - 'miot_ev.py', 'miot_i18n.py', 'miot_lan.py', 'miot_mdns.py', diff --git a/test/test_ev.py b/test/test_ev.py deleted file mode 100644 index 6353fe8c..00000000 --- a/test/test_ev.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- -"""Unit test for miot_ev.py.""" -import os -import pytest - -# pylint: disable=import-outside-toplevel, disable=unused-argument - - -@pytest.mark.github -def test_mev_timer_and_fd(): - from miot.miot_ev import MIoTEventLoop, TimeoutHandle - - mev = MIoTEventLoop() - assert mev - event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK) - assert event_fd - timer4: TimeoutHandle = None - - def event_handler(event_fd): - value: int = os.eventfd_read(event_fd) - if value == 1: - mev.clear_timeout(timer4) - print('cancel timer4') - elif value == 2: - print('event write twice in a row') - elif value == 3: - mev.set_read_handler(event_fd, None, None) - os.close(event_fd) - event_fd = None - print('close event fd') - - def timer1_handler(event_fd): - os.eventfd_write(event_fd, 1) - - def timer2_handler(event_fd): - os.eventfd_write(event_fd, 1) - os.eventfd_write(event_fd, 1) - - def timer3_handler(event_fd): - os.eventfd_write(event_fd, 3) - - def timer4_handler(event_fd): - raise ValueError('unreachable code') - - mev.set_read_handler( - event_fd, event_handler, event_fd) - - mev.set_timeout(500, timer1_handler, event_fd) - mev.set_timeout(1000, timer2_handler, event_fd) - mev.set_timeout(1500, timer3_handler, event_fd) - timer4 = mev.set_timeout(2000, timer4_handler, event_fd) - - mev.loop_forever() - # Loop will exit when there are no timers or fd handlers. - mev.loop_stop()