Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 35 additions & 64 deletions broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from .exceptions import check_error, exception

def gendevice(devtype, host, mac, name=None, cloud=None):
devices = {
Expand Down Expand Up @@ -221,10 +222,7 @@ def auth(self):
payload[0x36] = ord('1')

response = self.send_packet(0x65, payload)

if any(response[0x22:0x24]):
return False

check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])

key = payload[0x04:0x14]
Expand Down Expand Up @@ -301,7 +299,7 @@ def send_packet(self, command, payload):
break
except socket.timeout:
if (time.time() - start_time) > self.timeout:
raise
raise exception(0xfffd)
finally:
cs.close()
return bytearray(response[0])
Expand All @@ -328,7 +326,8 @@ def set_power_mask(self, sid_mask, state):
packet[0x0d] = sid_mask
packet[0x0e] = sid_mask if state else 0

self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def set_power(self, sid, state):
"""Sets the power state of the smart power strip."""
Expand All @@ -348,9 +347,7 @@ def check_power_raw(self):
packet[0x08] = 0x01

response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
state = payload[0x0e]
Expand Down Expand Up @@ -384,6 +381,7 @@ def get_state(self):
eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`"""
packet = self._encode(1, b'{}')
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
return self._decode(response)

def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None):
Expand All @@ -405,6 +403,7 @@ def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktim
js = json.dumps(data).encode('utf8')
packet = self._encode(2, js)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag, js):
Expand Down Expand Up @@ -432,10 +431,6 @@ def _encode(self, flag, js):
return packet

def _decode(self, response):
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None

payload = self.decrypt(bytes(response[0x38:]))
js_len = struct.unpack_from('<I', payload, 0x0a)[0]
state = json.loads(payload[0x0e:0x0e+js_len])
Expand All @@ -449,7 +444,8 @@ def __init__(self, *args, **kwargs):
def set_power(self, state):
packet = bytearray(4)
packet[0] = state
self.send_packet(0x66, packet)
response = self.send_packet(0x66, packet)
check_error(response[0x22:0x24])


class sp2(device):
Expand All @@ -465,7 +461,8 @@ def set_power(self, state):
packet[4] = 3 if state else 2
else:
packet[4] = 1 if state else 0
self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def set_nightlight(self, state):
"""Sets the night light state of the smart plug"""
Expand All @@ -475,16 +472,15 @@ def set_nightlight(self, state):
packet[4] = 3 if state else 1
else:
packet[4] = 2 if state else 0
self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def check_power(self):
"""Returns the power state of the smart plug."""
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
Expand All @@ -495,9 +491,7 @@ def check_nightlight(self):
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
Expand All @@ -506,9 +500,7 @@ def check_nightlight(self):
def get_energy(self):
packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45])
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x7], int):
energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0
Expand All @@ -527,9 +519,7 @@ def check_sensors(self):
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
data = {}
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
Expand Down Expand Up @@ -578,9 +568,7 @@ def check_sensors_raw(self):
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
data = {}
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
Expand Down Expand Up @@ -609,40 +597,40 @@ def check_data(self):
packet = bytearray(self._request_header)
packet.append(0x04)
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
return payload[len(self._request_header) + 4:]

def send_data(self, data):
packet = bytearray(self._code_sending_header)
packet += bytes([0x02, 0x00, 0x00, 0x00])
packet += data
self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def enter_learning(self):
packet = bytearray(self._request_header)
packet.append(0x03)
self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def sweep_frequency(self):
packet = bytearray(self._request_header)
packet.append(0x19)
self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def cancel_sweep_frequency(self):
packet = bytearray(self._request_header)
packet.append(0x1e)
self.send_packet(0x6a, packet)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])

def check_frequency(self):
packet = bytearray(self._request_header)
packet.append(0x1a)
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return False
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if payload[len(self._request_header) + 4] == 1:
return True
Expand All @@ -652,9 +640,7 @@ def find_rf_packet(self):
packet = bytearray(self._request_header)
packet.append(0x1b)
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return False
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if payload[len(self._request_header) + 4] == 1:
return True
Expand All @@ -664,9 +650,7 @@ def _read_sensor(self, type, offset, divider):
packet = bytearray(self._request_header)
packet.append(type)
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return False
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
value_pos = len(self._request_header) + offset
if isinstance(payload[value_pos], int):
Expand Down Expand Up @@ -767,12 +751,7 @@ def send_request(self, input_payload):

# send to device
response = self.send_packet(0x6a, request_payload)

# check for error
err = response[0x22] | (response[0x23] << 8)
if err:
raise ValueError('broadlink_response_error', err)

check_error(response[0x22:0x24])
response_payload = bytearray(self.decrypt(bytes(response[0x38:])))

# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
Expand Down Expand Up @@ -940,10 +919,7 @@ def get_sensors_status(self):
packet = bytearray(16)
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None

check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if not payload:
return None
Expand Down Expand Up @@ -991,9 +967,7 @@ def _send(self, magic1, magic2):
packet[9] = 0xfa
packet[10] = 0x44
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
return ord(payload[4])

Expand Down Expand Up @@ -1059,10 +1033,7 @@ def send_command(self,command, type = 'set'):
packet[0x07] = checksum >> 8 # Checksum 2 position

response = self.send_packet(0x6a, packet)

err = response[0x36] | (response[0x37] << 8)
if err != 0:
return None
check_error(response[0x36:0x38])
payload = self.decrypt(bytes(response[0x38:]))

responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8)
Expand Down
97 changes: 97 additions & 0 deletions broadlink/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Exceptions for Broadlink devices."""


class BroadlinkException(Exception):
"""Common base class for all Broadlink exceptions."""
pass


class AuthenticationError(BroadlinkException):
"""Authentication error."""
pass


class AuthorizationError(BroadlinkException):
"""Authorization error."""
pass


class CommandNotSupportedError(BroadlinkException):
"""Command not supported error."""
pass


class ConnectionClosedError(BroadlinkException):
"""Connection closed error."""
pass


class DataValidationError(BroadlinkException):
"""Data validation error."""
pass


class DeviceOfflineError(BroadlinkException):
"""Device offline error."""
pass


class ReadError(BroadlinkException):
"""Read error."""
pass


class SendError(BroadlinkException):
"""Send error."""
pass


class SSIDNotFoundError(BroadlinkException):
"""SSID not found error."""
pass


class StorageError(BroadlinkException):
"""Storage error."""
pass


class UnknownError(BroadlinkException):
"""Unknown error."""
pass


class WriteError(BroadlinkException):
"""Write error."""
pass


FIRMWARE_ERRORS = {
0xffff: (AuthenticationError, "Authentication failed"),
0xfffe: (ConnectionClosedError, "You have been logged out"),
0xfffd: (DeviceOfflineError, "The device is offline"),
0xfffc: (CommandNotSupportedError, "Command not supported"),
0xfffb: (StorageError, "The device storage is full"),
0xfffa: (DataValidationError, "Structure is abnormal"),
0xfff9: (AuthorizationError, "Control key is expired"),
0xfff8: (SendError, "Send error"),
0xfff7: (WriteError, "Write error"),
0xfff6: (ReadError, "Read error"),
0xfff5: (SSIDNotFoundError, "SSID could not be found in AP configuration"),
}


def exception(error_code):
"""Return exception corresponding to an error code."""
try:
exc, msg = FIRMWARE_ERRORS[error_code]
return exc(msg)
except KeyError:
return UnknownError("Unknown error: " + hex(error_code))


def check_error(error):
"""Raise exception if an error occurred."""
error_code = error[0] | (error[1] << 8)
if error_code:
raise exception(error_code)