forked from oischinger/eufyp2pstream
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwebsocket.py
83 lines (70 loc) · 2.89 KB
/
websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import logging
import asyncio
import aiohttp
import traceback
from typing import Any, Coroutine, Text
from typing import Callable
_LOGGER: logging.Logger = logging.getLogger(__package__)
class EufySecurityWebSocket:
def __init__(
self,
host: "172.16.1.28",
port: "3000",
session: aiohttp.ClientSession,
open_callback: Callable[[], Coroutine[Any, Any, None]],
message_callback: Callable[[], Coroutine[Any, Any, None]],
close_callback: Callable[[], Coroutine[Any, Any, None]],
error_callback: Callable[[Text], Coroutine[Any, Any, None]],
):
self.host = host
self.port = port
self.session = session
self.open_callback = open_callback
self.message_callback = message_callback
self.close_callback = close_callback
self.error_callback = error_callback
self.base = f"ws://{self.host}:{self.port}"
self.ws: aiohttp.ClientWebSocketResponse = None
self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
async def connect(self):
print(f" - set_ws - connect")
self.ws: aiohttp.ClientWebSocketResponse = await self.session.ws_connect(
self.base, autoclose=False, autoping=True, heartbeat=60
)
task = self.loop.create_task(self.process_messages())
task.add_done_callback(self.on_close)
await self.async_on_open()
async def async_on_open(self) -> None:
if not self.ws.closed:
if self.open_callback is not None:
await self.open_callback()
async def process_messages(self):
print(f" - process_messages started")
async for msg in self.ws:
try:
await self.on_message(msg)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error(
f" - Exception - process_messages: %s - traceback: %s - message: %s",
ex,
traceback.format_exc(),
msg,
)
async def on_message(self, message):
if self.message_callback is not None:
await self.message_callback(message)
def on_error(self, error: Text = "Unspecified") -> None:
print(f" - WebSocket Error: %s", error)
if self.error_callback is not None:
asyncio.run_coroutine_threadsafe(
self.error_callback(error), self.loop
).result()
def on_close(self, future="") -> None:
print(f" - WebSocket Connection Closed. %s", future)
print(f" - WebSocket Connection Closed. %s", self.close_callback)
if self.close_callback is not None:
self.ws = None
asyncio.run_coroutine_threadsafe(self.close_callback(), self.loop)
async def send_message(self, message):
# print(f" - WebSocket message sent. %s", message)
await self.ws.send_str(message)