From 99c48b1a94dd31d01aeeb9fa59d47ba24cdaa9a0 Mon Sep 17 00:00:00 2001 From: Starry-OvO Date: Sat, 4 Feb 2023 23:07:22 +0800 Subject: [PATCH] feat: websocket --- aiotieba/__init__.py | 1 + aiotieba/__version__.py | 2 +- aiotieba/client/_client.py | 12 +- aiotieba/client/_helper/__init__.py | 2 + aiotieba/client/_helper/_func.py | 69 ++++++ aiotieba/client/_websocket/_helper.py | 76 ------ aiotieba/client/get_group_msg/__init__.py | 2 +- aiotieba/client/get_group_msg/_api.py | 21 +- aiotieba/client/get_group_msg/_classdef.py | 228 ++++++++++++++++-- .../protobuf/GetGroupMsgReqIdl.proto | 6 +- .../protobuf/GetGroupMsgReqIdl_pb2.py | 10 +- .../protobuf/GetGroupMsgResIdl.proto | 27 ++- .../protobuf/GetGroupMsgResIdl_pb2.py | 16 +- aiotieba/client/init_websocket/__init__.py | 2 + .../{_websocket => init_websocket}/_api.py | 6 +- aiotieba/client/init_websocket/_classdef.py | 41 ++++ .../protobuf/UpdateClientInfoReqIdl.proto | 0 .../protobuf/UpdateClientInfoReqIdl_pb2.py | 0 .../protobuf/UpdateClientInfoResIdl.proto | 0 .../protobuf/UpdateClientInfoResIdl_pb2.py | 0 aiotieba/client/push_notify/__init__.py | 1 + aiotieba/client/push_notify/_api.py | 11 +- aiotieba/client/push_notify/_classdef.py | 75 ++++++ .../protobuf/PushNotifyResIdl.proto | 4 +- .../protobuf/PushNotifyResIdl_pb2.py | 12 +- .../{_websocket => websocket}/__init__.py | 0 .../{_websocket => websocket}/_classdef.py | 196 ++++++++++----- 27 files changed, 635 insertions(+), 185 deletions(-) delete mode 100644 aiotieba/client/_websocket/_helper.py create mode 100644 aiotieba/client/init_websocket/__init__.py rename aiotieba/client/{_websocket => init_websocket}/_api.py (87%) create mode 100644 aiotieba/client/init_websocket/_classdef.py rename aiotieba/client/{_websocket => init_websocket}/protobuf/UpdateClientInfoReqIdl.proto (100%) rename aiotieba/client/{_websocket => init_websocket}/protobuf/UpdateClientInfoReqIdl_pb2.py (100%) rename aiotieba/client/{_websocket => init_websocket}/protobuf/UpdateClientInfoResIdl.proto (100%) rename aiotieba/client/{_websocket => init_websocket}/protobuf/UpdateClientInfoResIdl_pb2.py (100%) create mode 100644 aiotieba/client/push_notify/_classdef.py rename aiotieba/client/{_websocket => websocket}/__init__.py (100%) rename aiotieba/client/{_websocket => websocket}/_classdef.py (51%) diff --git a/aiotieba/__init__.py b/aiotieba/__init__.py index 3824bfce..5ec7ddf8 100644 --- a/aiotieba/__init__.py +++ b/aiotieba/__init__.py @@ -33,6 +33,7 @@ TypeFragTiebaPlus, UserInfo_home, ) +from .client.websocket import Websocket if os.name == 'posix': import signal diff --git a/aiotieba/__version__.py b/aiotieba/__version__.py index 6eb6ff71..dd777d30 100644 --- a/aiotieba/__version__.py +++ b/aiotieba/__version__.py @@ -1 +1 @@ -__version__ = "3.1.5a2" +__version__ = "3.1.5a3" diff --git a/aiotieba/client/_client.py b/aiotieba/client/_client.py index b26e4edd..73a501d8 100644 --- a/aiotieba/client/_client.py +++ b/aiotieba/client/_client.py @@ -11,8 +11,8 @@ from ._core import TbCore from ._helper import ForumInfoCache, is_portrait from ._typing import TypeUserInfo -from ._websocket import Websocket from .get_homepage._classdef import UserInfo_home +from .websocket import Websocket if TYPE_CHECKING: import numpy as np @@ -75,6 +75,9 @@ def __init__( loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: + if loop is None: + loop = asyncio.get_running_loop() + self._core = TbCore(BDUSS_key, proxy, loop) self._user = UserInfo_home()._init_null() @@ -2059,13 +2062,12 @@ async def send_msg(self, _id: Union[str, int], content: str) -> bool: try: await self.websocket.init_websocket() - self.websocket._record_id += 1 from . import send_msg - proto = send_msg.pack_proto(user_id, content, self.websocket._record_id) - body = await self.websocket.send(proto, send_msg.CMD, timeout=5.0) - send_msg.parse_body(body) + proto = send_msg.pack_proto(user_id, content, self.websocket.record_id) + resp = await self.websocket.send(proto, send_msg.CMD) + send_msg.parse_body(await resp.read()) except Exception as err: LOG().warning(f"{err}. user_id={user_id}") diff --git a/aiotieba/client/_helper/__init__.py b/aiotieba/client/_helper/__init__.py index 4ebd66e2..b36293c2 100644 --- a/aiotieba/client/_helper/__init__.py +++ b/aiotieba/client/_helper/__init__.py @@ -11,7 +11,9 @@ pack_proto_request, pack_web_form_request, pack_web_get_request, + pack_ws_bytes, parse_json, + parse_ws_bytes, removeprefix, removesuffix, send_request, diff --git a/aiotieba/client/_helper/_func.py b/aiotieba/client/_helper/_func.py index b39e98af..b689edb3 100644 --- a/aiotieba/client/_helper/_func.py +++ b/aiotieba/client/_helper/_func.py @@ -271,6 +271,75 @@ def pack_web_form_request(core: TbCore, url: yarl.URL, data: List[Tuple[str, str return request +def pack_ws_bytes( + core: TbCore, data: bytes, cmd: int, req_id: int, *, compress: bool = False, encrypt: bool = True +) -> bytes: + """ + 打包数据并添加9字节头部 + + Args: + core (TiebaCore): 贴吧核心参数容器 + data (bytes): 待发送的websocket数据 + cmd (int): 请求的cmd类型 + req_id (int): 请求的id + compress (bool, optional): 是否需要gzip压缩. Defaults to False. + encrypt (bool, optional): 是否需要aes加密. Defaults to True. + + Returns: + bytes: 打包后的websocket数据 + """ + + flag = 0x08 + + if compress: + flag |= 0b01000000 + data = gzip.compress(data, compresslevel=-1, mtime=0) + if encrypt: + flag |= 0b10000000 + data = pad(data, AES.block_size) + data = core.aes_ecb_chiper.encrypt(data) + + data = b''.join( + [ + flag.to_bytes(1, 'big'), + cmd.to_bytes(4, 'big'), + req_id.to_bytes(4, 'big'), + data, + ] + ) + + return data + + +def parse_ws_bytes(core: TbCore, data: bytes) -> Tuple[bytes, int, int]: + """ + 对websocket返回数据进行解包 + + Args: + core (TiebaCore): 贴吧核心参数容器 + data (bytes): 接收到的websocket数据 + + Returns: + bytes: 解包后的websocket数据 + int: 对应请求的cmd类型 + int: 对应请求的id + """ + + data_view = memoryview(data) + flag = data_view[0] + cmd = int.from_bytes(data_view[1:5], 'big') + req_id = int.from_bytes(data_view[5:9], 'big') + + data = data_view[9:].tobytes() + if flag & 0b10000000: + data = core.aes_ecb_chiper.decrypt(data) + data = unpad(data, AES.block_size) + if flag & 0b01000000: + data = gzip.decompress(data) + + return data, cmd, req_id + + def check_status_code(response: aiohttp.ClientResponse) -> None: if response.status != 200: raise HTTPStatusError(response.status, response.reason) diff --git a/aiotieba/client/_websocket/_helper.py b/aiotieba/client/_websocket/_helper.py deleted file mode 100644 index 0f353794..00000000 --- a/aiotieba/client/_websocket/_helper.py +++ /dev/null @@ -1,76 +0,0 @@ -import gzip -from typing import Tuple - -from Crypto.Cipher import AES -from Crypto.Util.Padding import pad, unpad - -from .._core import TbCore - - -def pack_ws_bytes( - core: TbCore, data: bytes, cmd: int, req_id: int, *, compress: bool = False, encrypt: bool = True -) -> bytes: - """ - 打包数据并添加9字节头部 - - Args: - core (TiebaCore): 贴吧核心参数容器 - data (bytes): 待发送的websocket数据 - cmd (int): 请求的cmd类型 - req_id (int): 请求的id - compress (bool, optional): 是否需要gzip压缩. Defaults to False. - encrypt (bool, optional): 是否需要aes加密. Defaults to True. - - Returns: - bytes: 打包后的websocket数据 - """ - - flag = 0x08 - - if compress: - flag |= 0b01000000 - data = gzip.compress(data, compresslevel=-1, mtime=0) - if encrypt: - flag |= 0b10000000 - data = pad(data, AES.block_size) - data = core.aes_ecb_chiper.encrypt(data) - - data = b''.join( - [ - flag.to_bytes(1, 'big'), - cmd.to_bytes(4, 'big'), - req_id.to_bytes(4, 'big'), - data, - ] - ) - - return data - - -def parse_ws_bytes(core: TbCore, data: bytes) -> Tuple[bytes, int, int]: - """ - 对websocket返回数据进行解包 - - Args: - core (TiebaCore): 贴吧核心参数容器 - data (bytes): 接收到的websocket数据 - - Returns: - bytes: 解包后的websocket数据 - int: 对应请求的cmd类型 - int: 对应请求的id - """ - - data_view = memoryview(data) - flag = data_view[0] - cmd = int.from_bytes(data_view[1:5], 'big') - req_id = int.from_bytes(data_view[5:9], 'big') - - data = data_view[9:].tobytes() - if flag & 0b10000000: - data = core.aes_ecb_chiper.decrypt(data) - data = unpad(data, AES.block_size) - if flag & 0b01000000: - data = gzip.decompress(data) - - return data, cmd, req_id diff --git a/aiotieba/client/get_group_msg/__init__.py b/aiotieba/client/get_group_msg/__init__.py index 753c897f..a34f9e24 100644 --- a/aiotieba/client/get_group_msg/__init__.py +++ b/aiotieba/client/get_group_msg/__init__.py @@ -1,2 +1,2 @@ from ._api import CMD, pack_proto, parse_body -from ._classdef import MsgGroup +from ._classdef import GroupType diff --git a/aiotieba/client/get_group_msg/_api.py b/aiotieba/client/get_group_msg/_api.py index ba38cade..99409a52 100644 --- a/aiotieba/client/get_group_msg/_api.py +++ b/aiotieba/client/get_group_msg/_api.py @@ -2,27 +2,36 @@ from .._core import TbCore from .._exception import TiebaServerError -from ._classdef import MsgGroup +from ..push_notify import WsNotify +from ._classdef import WsMsgGroup from .protobuf import GetGroupMsgReqIdl_pb2, GetGroupMsgResIdl_pb2 CMD = 202003 -def pack_proto(core: TbCore, groups: List[MsgGroup], get_type: int) -> bytes: +def pack_proto(core: TbCore, notifies: List[WsNotify], get_type: int) -> bytes: req_proto = GetGroupMsgReqIdl_pb2.GetGroupMsgReqIdl() - for group in groups: + req_proto.data.width = 720 + req_proto.data.height = 1280 + req_proto.data.smallWidth = 240 + req_proto.data.smallHeight = 240 + for notify in notifies: group_proto = req_proto.data.groupMids.add() - group_proto.groupId = group._group_type - group_proto.lastMsgId = group._last_msg_id + group_proto.groupId = notify._group_id + group_proto.lastMsgId = notify._last_msg_id req_proto.data.gettype = str(get_type) req_proto.cuid = f"{core.cuid}|com.baidu.tieba_mini{core.post_version}" return req_proto.SerializeToString() -def parse_body(body: bytes) -> None: +def parse_body(body: bytes) -> List[WsMsgGroup]: res_proto = GetGroupMsgResIdl_pb2.GetGroupMsgResIdl() res_proto.ParseFromString(body) if code := res_proto.error.errorno: raise TiebaServerError(code, res_proto.error.errmsg) + + groups = [WsMsgGroup()._init(p) for p in res_proto.data.groupInfo] + + return groups diff --git a/aiotieba/client/get_group_msg/_classdef.py b/aiotieba/client/get_group_msg/_classdef.py index aa5c2fc0..51c1c167 100644 --- a/aiotieba/client/get_group_msg/_classdef.py +++ b/aiotieba/client/get_group_msg/_classdef.py @@ -1,38 +1,238 @@ -class MsgGroup(object): +import enum +from typing import List + +from .._classdef import TypeMessage + + +class GroupType(enum.IntEnum): + """ + 消息组类型 + """ + + PRIVATE_MSG = 6 + MISC = 8 + + +class UserInfo_ws(object): + """ + 用户信息 + + Attributes: + user_id (int): user_id + portrait (str): portrait + user_name (str): 用户名 + + log_name (str): 用于在日志中记录用户信息 + """ + + __slots__ = [ + '_user_id', + '_portrait', + '_user_name', + ] + + def _init(self, data_proto: TypeMessage) -> "UserInfo_ws": + self._user_id = data_proto.userId + if '?' in (portrait := data_proto.portrait): + self._portrait = portrait[:-13] + else: + self._portrait = portrait + self._user_name = data_proto.userName + return self + + def _init_null(self) -> "UserInfo_ws": + self._user_id = 0 + self._portrait = '' + self._user_name = '' + return self + + def __str__(self) -> str: + return self._user_name or self._portrait or str(self._user_id) + + def __repr__(self) -> str: + return str( + { + 'user_id': self._user_id, + 'user_name': self._user_name, + 'portrait': self._portrait, + } + ) + + def __eq__(self, obj: "UserInfo_ws") -> bool: + return self._user_id == obj._user_id + + def __hash__(self) -> int: + return self._user_id + + def __int__(self) -> int: + return self._user_id + + def __bool__(self) -> bool: + return bool(self._user_id) + + @property + def user_id(self) -> int: + """ + 用户user_id + + Note: + 唯一 不可变 不可为空 + 请注意与用户个人页的tieba_uid区分 + """ + + return self._user_id + + @property + def portrait(self) -> str: + """ + 用户portrait + + Note: + 唯一 不可变 不可为空 + """ + + return self._portrait + + @property + def user_name(self) -> str: + """ + 用户名 + + Note: + 唯一 可变 可为空 + 请注意与用户昵称区分 + """ + + return self._user_name + + @property + def log_name(self) -> str: + """ + 用于在日志中记录用户信息 + """ + + return self.__str__() + + +class WsMessage(object): + """ + websocket消息 + """ + + __slots__ = [ + '_msg_id', + '_msg_type', + '_text', + '_user', + '_create_time', + ] + + def _init(self, data_proto: TypeMessage) -> "WsMessage": + self._msg_id = data_proto.msgId + self._msg_type = data_proto.msgType + self._text = data_proto.content + self._user = UserInfo_ws()._init(data_proto.userInfo) + self._create_time = data_proto.createTime + return self + + def __repr__(self) -> str: + return str( + { + 'text': self._text, + 'user': self._user, + } + ) + + @property + def msg_id(self) -> int: + """ + 消息id + """ + + return self._msg_id + + @property + def msg_type(self) -> int: + """ + 消息类型 + """ + + return self._msg_type + + @property + def text(self) -> str: + """ + 文本内容 + """ + + return self._text + + @property + def user(self) -> UserInfo_ws: + """ + 发信人的用户信息 + """ + + return self._user + + @property + def create_time(self) -> int: + """ + 发送时间 + + Note: + 10位时间戳 以秒为单位 + """ + + return self._create_time + + +class WsMsgGroup(object): + """ + websocket消息组 + """ __slots__ = [ '_group_type', - '_last_msg_id', + '_group_id', + '_messages', ] - def __init__(self, group_type: int, last_msg_id: int) -> None: - self._group_type = group_type - self._last_msg_id = last_msg_id + def _init(self, data_proto: TypeMessage) -> "WsMsgGroup": + self._group_type = data_proto.groupInfo.groupType + self._group_id = data_proto.groupInfo.groupId + self._messages = [WsMessage()._init(p) for p in data_proto.msgList] + return self def __repr__(self) -> str: return str( { 'group_type': self._group_type, - 'last_msg_id': self._last_msg_id, + 'group_id': self._group_id, + 'messages': self._messages, } ) + @property def group_type(self) -> int: """ 消息组类别 - - Returns: - int """ return self._group_type - def last_msg_id(self) -> int: + @property + def group_id(self) -> int: + """ + 消息组id """ - 最后一条消息的id - Returns: - int + return self._group_id + + @property + def messages(self) -> List[WsMessage]: + """ + 消息列表 """ - return self._last_msg_id + return self._messages diff --git a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl.proto b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl.proto index 56bc8ce8..ff32e9b2 100644 --- a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl.proto +++ b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl.proto @@ -1,9 +1,13 @@ -// protobuf.CommitPersonalMsg.CommitPersonalMsgReqIdl +// protobuf.GetGroupMsg.GetGroupMsgReqIdl syntax = "proto3"; message GetGroupMsgReqIdl { string cuid = 1; message DataReq { + int32 width = 1; + int32 height = 2; + int32 smallWidth = 3; + int32 smallHeight = 4; message GroupLastId { int64 groupId = 1; int64 lastMsgId = 2; diff --git a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl_pb2.py b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl_pb2.py index 49adcba3..39b1e397 100644 --- a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl_pb2.py +++ b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgReqIdl_pb2.py @@ -13,7 +13,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17GetGroupMsgReqIdl.proto\"\xd6\x01\n\x11GetGroupMsgReqIdl\x12\x0c\n\x04\x63uid\x18\x01 \x01(\t\x12(\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x1a.GetGroupMsgReqIdl.DataReq\x1a\x88\x01\n\x07\x44\x61taReq\x12\x39\n\tgroupMids\x18\x06 \x03(\x0b\x32&.GetGroupMsgReqIdl.DataReq.GroupLastId\x12\x0f\n\x07gettype\x18\x07 \x01(\t\x1a\x31\n\x0bGroupLastId\x12\x0f\n\x07groupId\x18\x01 \x01(\x03\x12\x11\n\tlastMsgId\x18\x02 \x01(\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17GetGroupMsgReqIdl.proto\"\x9e\x02\n\x11GetGroupMsgReqIdl\x12\x0c\n\x04\x63uid\x18\x01 \x01(\t\x12(\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x1a.GetGroupMsgReqIdl.DataReq\x1a\xd0\x01\n\x07\x44\x61taReq\x12\r\n\x05width\x18\x01 \x01(\x05\x12\x0e\n\x06height\x18\x02 \x01(\x05\x12\x12\n\nsmallWidth\x18\x03 \x01(\x05\x12\x13\n\x0bsmallHeight\x18\x04 \x01(\x05\x12\x39\n\tgroupMids\x18\x06 \x03(\x0b\x32&.GetGroupMsgReqIdl.DataReq.GroupLastId\x12\x0f\n\x07gettype\x18\x07 \x01(\t\x1a\x31\n\x0bGroupLastId\x12\x0f\n\x07groupId\x18\x01 \x01(\x03\x12\x11\n\tlastMsgId\x18\x02 \x01(\x03\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'GetGroupMsgReqIdl_pb2', globals()) @@ -21,9 +21,9 @@ DESCRIPTOR._options = None _GETGROUPMSGREQIDL._serialized_start=28 - _GETGROUPMSGREQIDL._serialized_end=242 + _GETGROUPMSGREQIDL._serialized_end=314 _GETGROUPMSGREQIDL_DATAREQ._serialized_start=106 - _GETGROUPMSGREQIDL_DATAREQ._serialized_end=242 - _GETGROUPMSGREQIDL_DATAREQ_GROUPLASTID._serialized_start=193 - _GETGROUPMSGREQIDL_DATAREQ_GROUPLASTID._serialized_end=242 + _GETGROUPMSGREQIDL_DATAREQ._serialized_end=314 + _GETGROUPMSGREQIDL_DATAREQ_GROUPLASTID._serialized_start=265 + _GETGROUPMSGREQIDL_DATAREQ_GROUPLASTID._serialized_end=314 # @@protoc_insertion_point(module_scope) diff --git a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl.proto b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl.proto index c7c50f9a..96a6b91a 100644 --- a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl.proto +++ b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl.proto @@ -1,8 +1,33 @@ -// protobuf.CommitPersonalMsg.CommitPersonalMsgResIdl +// protobuf.GetGroupMsg.GetGroupMsgResIdl syntax = "proto3"; import "Error.proto"; message GetGroupMsgResIdl { Error error = 1; + message DataRes { + message GroupMsg { + message GroupInfo { + int64 groupId = 1; + int32 groupType = 20; + } + GroupInfo groupInfo = 1; + message MsgInfo { + int64 msgId = 1; + int32 msgType = 3; + string content = 5; + int32 createTime = 8; + message UserInfo { + int64 userId = 1; + string userName = 2; + string portrait = 4; + // string userNameShow = 18; + } + UserInfo userInfo = 10; + } + repeated MsgInfo msgList = 2; + } + repeated GroupMsg groupInfo = 1; + } + DataRes data = 2; } diff --git a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl_pb2.py b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl_pb2.py index 6bdd55ee..7d3879a5 100644 --- a/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl_pb2.py +++ b/aiotieba/client/get_group_msg/protobuf/GetGroupMsgResIdl_pb2.py @@ -14,13 +14,23 @@ from ..._protobuf import Error_pb2 as Error__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17GetGroupMsgResIdl.proto\x1a\x0b\x45rror.proto\"*\n\x11GetGroupMsgResIdl\x12\x15\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x06.Errorb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17GetGroupMsgResIdl.proto\x1a\x0b\x45rror.proto\"\xaf\x04\n\x11GetGroupMsgResIdl\x12\x15\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x06.Error\x12(\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x1a.GetGroupMsgResIdl.DataRes\x1a\xd8\x03\n\x07\x44\x61taRes\x12\x36\n\tgroupInfo\x18\x01 \x03(\x0b\x32#.GetGroupMsgResIdl.DataRes.GroupMsg\x1a\x94\x03\n\x08GroupMsg\x12@\n\tgroupInfo\x18\x01 \x01(\x0b\x32-.GetGroupMsgResIdl.DataRes.GroupMsg.GroupInfo\x12<\n\x07msgList\x18\x02 \x03(\x0b\x32+.GetGroupMsgResIdl.DataRes.GroupMsg.MsgInfo\x1a/\n\tGroupInfo\x12\x0f\n\x07groupId\x18\x01 \x01(\x03\x12\x11\n\tgroupType\x18\x14 \x01(\x05\x1a\xd6\x01\n\x07MsgInfo\x12\r\n\x05msgId\x18\x01 \x01(\x03\x12\x0f\n\x07msgType\x18\x03 \x01(\x05\x12\x0f\n\x07\x63ontent\x18\x05 \x01(\t\x12\x12\n\ncreateTime\x18\x08 \x01(\x05\x12\x46\n\x08userInfo\x18\n \x01(\x0b\x32\x34.GetGroupMsgResIdl.DataRes.GroupMsg.MsgInfo.UserInfo\x1a>\n\x08UserInfo\x12\x0e\n\x06userId\x18\x01 \x01(\x03\x12\x10\n\x08userName\x18\x02 \x01(\t\x12\x10\n\x08portrait\x18\x04 \x01(\tb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'GetGroupMsgResIdl_pb2', globals()) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _GETGROUPMSGRESIDL._serialized_start=40 - _GETGROUPMSGRESIDL._serialized_end=82 + _GETGROUPMSGRESIDL._serialized_start=41 + _GETGROUPMSGRESIDL._serialized_end=600 + _GETGROUPMSGRESIDL_DATARES._serialized_start=128 + _GETGROUPMSGRESIDL_DATARES._serialized_end=600 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG._serialized_start=196 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG._serialized_end=600 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG_GROUPINFO._serialized_start=336 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG_GROUPINFO._serialized_end=383 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG_MSGINFO._serialized_start=386 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG_MSGINFO._serialized_end=600 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG_MSGINFO_USERINFO._serialized_start=538 + _GETGROUPMSGRESIDL_DATARES_GROUPMSG_MSGINFO_USERINFO._serialized_end=600 # @@protoc_insertion_point(module_scope) diff --git a/aiotieba/client/init_websocket/__init__.py b/aiotieba/client/init_websocket/__init__.py new file mode 100644 index 00000000..6ec67c4f --- /dev/null +++ b/aiotieba/client/init_websocket/__init__.py @@ -0,0 +1,2 @@ +from ._api import CMD, pack_proto, parse_body +from ._classdef import WsMsgGroupInfo diff --git a/aiotieba/client/_websocket/_api.py b/aiotieba/client/init_websocket/_api.py similarity index 87% rename from aiotieba/client/_websocket/_api.py rename to aiotieba/client/init_websocket/_api.py index b6672d12..54ce167a 100644 --- a/aiotieba/client/_websocket/_api.py +++ b/aiotieba/client/init_websocket/_api.py @@ -4,7 +4,7 @@ from .._core import TbCore from .._exception import TiebaServerError from .._helper import jsonlib -from ..get_group_msg import MsgGroup +from ._classdef import WsMsgGroupInfo from .protobuf import UpdateClientInfoReqIdl_pb2, UpdateClientInfoResIdl_pb2 CMD = 1001 @@ -31,13 +31,13 @@ def pack_proto(core: TbCore, secret_key: str) -> bytes: return req_proto.SerializeToString() -def parse_body(body: bytes) -> List[MsgGroup]: +def parse_body(body: bytes) -> List[WsMsgGroupInfo]: res_proto = UpdateClientInfoResIdl_pb2.UpdateClientInfoResIdl() res_proto.ParseFromString(body) if code := res_proto.error.errorno: raise TiebaServerError(code, res_proto.error.errmsg) - groups = [MsgGroup(g.groupType, g.lastMsgId) for g in res_proto.data.groupInfo] + groups = [WsMsgGroupInfo()._init(p) for p in res_proto.data.groupInfo] return groups diff --git a/aiotieba/client/init_websocket/_classdef.py b/aiotieba/client/init_websocket/_classdef.py new file mode 100644 index 00000000..9395bcdc --- /dev/null +++ b/aiotieba/client/init_websocket/_classdef.py @@ -0,0 +1,41 @@ +from .._classdef import TypeMessage + + +class WsMsgGroupInfo(object): + """ + websocket消息组的相关信息 + """ + + __slots__ = [ + '_group_type', + '_last_msg_id', + ] + + def _init(self, data_proto: TypeMessage) -> "WsMsgGroupInfo": + self._group_type = data_proto.groupType + self._last_msg_id = data_proto.lastMsgId + return self + + def __repr__(self) -> str: + return str( + { + 'group_type': self._group_type, + 'last_msg_id': self._last_msg_id, + } + ) + + @property + def group_type(self) -> int: + """ + 消息组类别 + """ + + return self._group_type + + @property + def last_msg_id(self) -> int: + """ + 最后一条消息的id + """ + + return self._last_msg_id diff --git a/aiotieba/client/_websocket/protobuf/UpdateClientInfoReqIdl.proto b/aiotieba/client/init_websocket/protobuf/UpdateClientInfoReqIdl.proto similarity index 100% rename from aiotieba/client/_websocket/protobuf/UpdateClientInfoReqIdl.proto rename to aiotieba/client/init_websocket/protobuf/UpdateClientInfoReqIdl.proto diff --git a/aiotieba/client/_websocket/protobuf/UpdateClientInfoReqIdl_pb2.py b/aiotieba/client/init_websocket/protobuf/UpdateClientInfoReqIdl_pb2.py similarity index 100% rename from aiotieba/client/_websocket/protobuf/UpdateClientInfoReqIdl_pb2.py rename to aiotieba/client/init_websocket/protobuf/UpdateClientInfoReqIdl_pb2.py diff --git a/aiotieba/client/_websocket/protobuf/UpdateClientInfoResIdl.proto b/aiotieba/client/init_websocket/protobuf/UpdateClientInfoResIdl.proto similarity index 100% rename from aiotieba/client/_websocket/protobuf/UpdateClientInfoResIdl.proto rename to aiotieba/client/init_websocket/protobuf/UpdateClientInfoResIdl.proto diff --git a/aiotieba/client/_websocket/protobuf/UpdateClientInfoResIdl_pb2.py b/aiotieba/client/init_websocket/protobuf/UpdateClientInfoResIdl_pb2.py similarity index 100% rename from aiotieba/client/_websocket/protobuf/UpdateClientInfoResIdl_pb2.py rename to aiotieba/client/init_websocket/protobuf/UpdateClientInfoResIdl_pb2.py diff --git a/aiotieba/client/push_notify/__init__.py b/aiotieba/client/push_notify/__init__.py index fe50ea1a..5297ed88 100644 --- a/aiotieba/client/push_notify/__init__.py +++ b/aiotieba/client/push_notify/__init__.py @@ -1 +1,2 @@ from ._api import CMD, parse_body +from ._classdef import WsNotify diff --git a/aiotieba/client/push_notify/_api.py b/aiotieba/client/push_notify/_api.py index 75dc4713..37fdb0ca 100644 --- a/aiotieba/client/push_notify/_api.py +++ b/aiotieba/client/push_notify/_api.py @@ -1,14 +1,15 @@ -from ..get_group_msg import MsgGroup +from typing import List + +from ._classdef import WsNotify from .protobuf import PushNotifyResIdl_pb2 CMD = 202006 -def parse_body(body: bytes) -> MsgGroup: +def parse_body(body: bytes) -> List[WsNotify]: res_proto = PushNotifyResIdl_pb2.PushNotifyResIdl() res_proto.ParseFromString(body) - group_proto = res_proto.multiMsg[0].data - group = MsgGroup(group_proto.groupId, group_proto.msgId) + notifies = [WsNotify()._init(p) for p in res_proto.multiMsg] - return group + return notifies diff --git a/aiotieba/client/push_notify/_classdef.py b/aiotieba/client/push_notify/_classdef.py new file mode 100644 index 00000000..48149819 --- /dev/null +++ b/aiotieba/client/push_notify/_classdef.py @@ -0,0 +1,75 @@ +from .._classdef import TypeMessage + + +class WsNotify(object): + """ + websocket主动推送消息提醒 + """ + + __slots__ = [ + '_group_type', + '_group_id', + '_last_msg_id', + '_create_time', + ] + + def _init(self, data_proto: TypeMessage) -> "WsNotify": + data_proto = data_proto.data + self._group_type = data_proto.groupType + self._group_id = data_proto.groupId + self._last_msg_id = data_proto.msgId + self._create_time = str(create_time) if (create_time := data_proto.et) else 0 + return self + + def __repr__(self) -> str: + return str( + { + 'group_type': self._group_type, + 'group_id': self._group_id, + 'last_msg_id': self._last_msg_id, + } + ) + + @property + def group_type(self) -> int: + """ + 消息组类别 + + Returns: + int + """ + + return self._group_type + + @property + def group_id(self) -> int: + """ + 消息组id + + Returns: + int + """ + + return self._group_id + + @property + def last_msg_id(self) -> int: + """ + 最后一条消息的id + + Returns: + int + """ + + return self._last_msg_id + + @property + def create_time(self) -> int: + """ + 推送时间 + + Note: + 10位时间戳 以秒为单位 + """ + + return self._create_time diff --git a/aiotieba/client/push_notify/protobuf/PushNotifyResIdl.proto b/aiotieba/client/push_notify/protobuf/PushNotifyResIdl.proto index 9d63799f..f766d560 100644 --- a/aiotieba/client/push_notify/protobuf/PushNotifyResIdl.proto +++ b/aiotieba/client/push_notify/protobuf/PushNotifyResIdl.proto @@ -1,4 +1,4 @@ -// protobuf.CommitPersonalMsg.CommitPersonalMsgResIdl +// protobuf.PushNotify.PushNotifyResIdl syntax = "proto3"; message PushNotifyResIdl { @@ -6,6 +6,8 @@ message PushNotifyResIdl { message PusherMsgInfo { int64 groupId = 1; int64 msgId = 2; + string et = 6; + int32 groupType = 7; } PusherMsgInfo data = 2; } diff --git a/aiotieba/client/push_notify/protobuf/PushNotifyResIdl_pb2.py b/aiotieba/client/push_notify/protobuf/PushNotifyResIdl_pb2.py index 65b17dbb..8208d285 100644 --- a/aiotieba/client/push_notify/protobuf/PushNotifyResIdl_pb2.py +++ b/aiotieba/client/push_notify/protobuf/PushNotifyResIdl_pb2.py @@ -13,7 +13,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16PushNotifyResIdl.proto\"\xb8\x01\n\x10PushNotifyResIdl\x12-\n\x08multiMsg\x18\x02 \x03(\x0b\x32\x1b.PushNotifyResIdl.PusherMsg\x1au\n\tPusherMsg\x12\x37\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32).PushNotifyResIdl.PusherMsg.PusherMsgInfo\x1a/\n\rPusherMsgInfo\x12\x0f\n\x07groupId\x18\x01 \x01(\x03\x12\r\n\x05msgId\x18\x02 \x01(\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16PushNotifyResIdl.proto\"\xd8\x01\n\x10PushNotifyResIdl\x12-\n\x08multiMsg\x18\x02 \x03(\x0b\x32\x1b.PushNotifyResIdl.PusherMsg\x1a\x94\x01\n\tPusherMsg\x12\x37\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32).PushNotifyResIdl.PusherMsg.PusherMsgInfo\x1aN\n\rPusherMsgInfo\x12\x0f\n\x07groupId\x18\x01 \x01(\x03\x12\r\n\x05msgId\x18\x02 \x01(\x03\x12\n\n\x02\x65t\x18\x06 \x01(\t\x12\x11\n\tgroupType\x18\x07 \x01(\x05\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'PushNotifyResIdl_pb2', globals()) @@ -21,9 +21,9 @@ DESCRIPTOR._options = None _PUSHNOTIFYRESIDL._serialized_start=27 - _PUSHNOTIFYRESIDL._serialized_end=211 - _PUSHNOTIFYRESIDL_PUSHERMSG._serialized_start=94 - _PUSHNOTIFYRESIDL_PUSHERMSG._serialized_end=211 - _PUSHNOTIFYRESIDL_PUSHERMSG_PUSHERMSGINFO._serialized_start=164 - _PUSHNOTIFYRESIDL_PUSHERMSG_PUSHERMSGINFO._serialized_end=211 + _PUSHNOTIFYRESIDL._serialized_end=243 + _PUSHNOTIFYRESIDL_PUSHERMSG._serialized_start=95 + _PUSHNOTIFYRESIDL_PUSHERMSG._serialized_end=243 + _PUSHNOTIFYRESIDL_PUSHERMSG_PUSHERMSGINFO._serialized_start=165 + _PUSHNOTIFYRESIDL_PUSHERMSG_PUSHERMSGINFO._serialized_end=243 # @@protoc_insertion_point(module_scope) diff --git a/aiotieba/client/_websocket/__init__.py b/aiotieba/client/websocket/__init__.py similarity index 100% rename from aiotieba/client/_websocket/__init__.py rename to aiotieba/client/websocket/__init__.py diff --git a/aiotieba/client/_websocket/_classdef.py b/aiotieba/client/websocket/_classdef.py similarity index 51% rename from aiotieba/client/_websocket/_classdef.py rename to aiotieba/client/websocket/_classdef.py index c65f8827..32319170 100644 --- a/aiotieba/client/_websocket/_classdef.py +++ b/aiotieba/client/websocket/_classdef.py @@ -1,8 +1,7 @@ import asyncio import binascii import time -import weakref -from typing import Awaitable, Callable, Dict, Optional +from typing import Any, Awaitable, Callable, Dict, Optional import aiohttp import async_timeout @@ -11,22 +10,94 @@ from Crypto.PublicKey import RSA from .._core import TbCore -from ._helper import pack_ws_bytes, parse_ws_bytes +from .._helper import pack_ws_bytes, parse_ws_bytes -_REQ_ID = None +TypeWebsocketCallback = Callable[[bytes, int], Awaitable[None]] -TypeWebsocketCallback = Callable[[bytes, int], Awaitable[None]] +class WsResponse(object): + """ + websocket响应 + + Args: + data_future (asyncio.Future): 用于等待读事件到来的Future + """ + + __slots__ = ['_future'] + + def __init__(self, data_future: asyncio.Future) -> None: + self._future = data_future + + def _cancel(self) -> None: + self._future.cancel() + + async def read(self, read_timeout: float = 5.0) -> bytes: + """ + 读取websocket响应 + + Args: + read_timeout (float, optional): 读取超时时间. Defaults to 5.0. + + Returns: + bytes + """ + + with async_timeout.timeout(read_timeout): + return await self._future -async def _default_callback(ws: "Websocket", data: bytes, req_id: int) -> None: +class WsWaiter(object): """ - 接收到消息时触发的默认回调 + websocket事件等待器 + + Args: + loop (asyncio.AbstractEventLoop): 事件循环 """ - res_future = ws._res_waiter.get(req_id, None) - if res_future: - res_future.set_result(data) + __slots__ = [ + '_loop', + '_waiter', + ] + + def __init__(self, loop: asyncio.AbstractEventLoop) -> None: + self._loop = loop + self._waiter: Dict[int, asyncio.Future] = {} + + def __get_callback(self, req_id: int) -> Callable[[asyncio.Future], Any]: + def done_callback(_): + del self._waiter[req_id] + + return done_callback + + def register(self, req_id: int) -> WsResponse: + """ + 将一个req_id注册到等待器 此方法会创建Future对象 + + Args: + req_id (int): 请求id + + Returns: + WsResponse: websocket响应 + """ + + data_future = self._loop.create_future() + data_future.add_done_callback(self.__get_callback(req_id)) + self._waiter[req_id] = data_future + return WsResponse(data_future) + + def set_done(self, req_id: int, data: bytes) -> None: + """ + 将req_id对应的Future设置为已完成 + + Args: + req_id (int): 请求id + data (bytes): 填入的数据 + """ + + data_future = self._waiter.get(req_id, None) + if data_future is None: + return + data_future.set_result(data) class Websocket(object): @@ -40,9 +111,10 @@ class Websocket(object): __slots__ = [ '_core', + '_req_id', '_record_id', - '_res_waiter', - '_callback', + '_waiter', + 'callback', '_client_ws', '_websocket', '_ws_dispatcher', @@ -50,13 +122,10 @@ class Websocket(object): def __init__(self, connector: aiohttp.TCPConnector, core: TbCore) -> None: self._core = core + self._req_id = int(time.time()) self._record_id = None - self._res_waiter = weakref.WeakValueDictionary() - self._callback: Dict[int, TypeWebsocketCallback] = {} - - global _REQ_ID - if _REQ_ID is None: - _REQ_ID = int(time.time()) + self._waiter = WsWaiter(core._loop) + self.callback: Dict[int, TypeWebsocketCallback] = {} timeout = aiohttp.ClientTimeout(connect=3.0, sock_read=12.0, sock_connect=3.2) ws_headers = { @@ -72,7 +141,7 @@ def __init__(self, connector: aiohttp.TCPConnector, core: TbCore) -> None: connector_owner=False, raise_for_status=True, timeout=timeout, - read_bufsize=256 * 1024, # 256KiB + read_bufsize=64 * 1024, ) self._websocket: aiohttp.ClientWebSocketResponse = None self._ws_dispatcher: asyncio.Task = None @@ -83,21 +152,23 @@ async def close(self) -> None: if not self._ws_dispatcher.done(): self._ws_dispatcher.cancel() - async def __ws_dispatch(self) -> None: - """ - 分发从贴吧websocket接收到的数据 - """ + def __default_callback(self, req_id: int, data: bytes) -> None: + self._waiter.set_done(req_id, data) + async def __ws_dispatch(self) -> None: try: async for msg in self._websocket: data, cmd, req_id = parse_ws_bytes(self._core, msg.data) - res_callback = self._callback.get(cmd, _default_callback) - await res_callback(self, data, req_id) + res_callback = self.callback.get(cmd, None) + if res_callback is None: + self.__default_callback(req_id, data) + else: + self._core._loop.create_task(res_callback(self, data, req_id)) except asyncio.CancelledError: return - async def __create_websocket(self, heartbeat: Optional[float] = 800.0) -> None: + async def _create_websocket(self, heartbeat: Optional[float] = 800.0) -> None: """ 建立weboscket连接 @@ -126,13 +197,12 @@ async def init_websocket(self) -> None: 初始化weboscket连接对象并发送初始化信息 Raises: - TiebaServerError: 服务端返回错误 + aiohttp.WSServerHandshakeError: websocket握手失败 + asyncio.TimeoutError: 发送初始化信息超时 """ if not self.is_aviliable: - await self.__create_websocket(heartbeat=None) - - from . import _api + await self._create_websocket(heartbeat=None) pub_key = binascii.a2b_base64( b"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwQpwBZxXJV/JVRF/uNfyMSdu7YWwRNLM8+2xbniGp2iIQHOikPpTYQjlQgMi1uvq1kZpJ32rHo3hkwjy2l0lFwr3u4Hk2Wk7vnsqYQjAlYlK0TCzjpmiI+OiPOUNVtbWHQiLiVqFtzvpvi4AU7C1iKGvc/4IS45WjHxeScHhnZZ7njS4S1UgNP/GflRIbzgbBhyZ9kEW5/OO5YfG1fy6r4KSlDJw4o/mw5XhftyIpL+5ZBVBC6E1EIiP/dd9AbK62VV1PByfPMHMixpxI3GM2qwcmFsXcCcgvUXJBa9k6zP8dDQ3csCM2QNT+CQAOxthjtp/TFWaD7MzOdsIYb3THwIDAQAB" @@ -141,12 +211,14 @@ async def init_websocket(self) -> None: rsa_chiper = PKCS1_v1_5.new(pub_key) secret_key = rsa_chiper.encrypt(self._core.aes_ecb_sec_key) - proto = _api.pack_proto(self._core, secret_key) - body = await self.send(proto, cmd=_api.CMD, compress=False, encrypt=False, timeout=5.0) - groups = _api.parse_body(body) + from .. import get_group_msg, init_websocket + + proto = init_websocket.pack_proto(self._core, secret_key) + resp = await self.send(proto, cmd=init_websocket.CMD, compress=False, encrypt=False, send_timeout=5.0) + groups = init_websocket.parse_body(await resp.read()) for group in groups: - if group._group_type == 6: # 私信组别类型为6 + if group._group_type == get_group_msg.GroupType.PRIVATE_MSG: self._record_id = group._last_msg_id * 100 @property @@ -154,8 +226,8 @@ def is_aviliable(self) -> bool: """ websocket是否可用 - Returns: - bool: True则websocket可用 反之不可用 + Note: + True则websocket可用 反之不可用 """ return not (self._websocket is None or self._websocket.closed or self._websocket._writer.transport.is_closing()) @@ -165,15 +237,29 @@ def record_id(self) -> int: """ 用作请求参数的记录id - Returns: - int + Note: + 每次调用都会使其自增1 """ + self._record_id += 1 return self._record_id + @property + def req_id(self) -> int: + """ + 用作请求参数的id + + Note: + 每个websocket请求都有一个唯一的req_id + 每次调用都会使其自增1 + """ + + self._req_id += 1 + return self._req_id + async def send( - self, data: bytes, cmd: int, *, compress: bool = False, encrypt: bool = True, timeout: float - ) -> bytes: + self, data: bytes, cmd: int, *, compress: bool = False, encrypt: bool = True, send_timeout: float = 3.0 + ) -> WsResponse: """ 将protobuf序列化结果打包发送 @@ -182,28 +268,24 @@ async def send( cmd (int): 请求的cmd类型 compress (bool, optional): 是否需要gzip压缩. Defaults to False. encrypt (bool, optional): 是否需要aes加密. Defaults to True. + send_timeout (float, optional): 发送超时时间. Defaults to 3.0. Returns: - bytes: 响应 - """ + WsResponse: websocket响应对象 - global _REQ_ID - _REQ_ID += 1 - req_id = _REQ_ID + Raises: + asyncio.TimeoutError: 发送超时 + """ + req_id = self.req_id req_data = pack_ws_bytes(self._core, data, cmd, req_id, compress=compress, encrypt=encrypt) - - res_future = asyncio.Future() - self._res_waiter[req_id] = res_future - - await self._websocket.send_bytes(req_data) + response = self._waiter.register(req_id) try: - with async_timeout.timeout(timeout): - res_data = await res_future - except asyncio.TimeoutError: - del self._res_waiter[req_id] - raise asyncio.TimeoutError("Timeout to read") + with async_timeout.timeout(send_timeout): + await self._websocket.send_bytes(req_data) + except asyncio.TimeoutError as err: + response._cancel() + raise asyncio.TimeoutError("Timeout to read") from err else: - del self._res_waiter[req_id] - return res_data + return response