-
-
Notifications
You must be signed in to change notification settings - Fork 32.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add new device tracker for Huawei Routers. (#8488)
* Add new device tracker for Huawei Routers. This was tested with the HG8247H model, used by Vodafone Portugal for the Fiber service. * add to .coveragerc; remove import and space * add comments and fix lint * rename methods * huawei_router: add constants to scanner class * huawei_router: remove lock; use format() in string * huawei_router: use tupple instead of member only class * huawei_router: reduce min scan time * huawei_router: lint * huawei_router: lint * huawei_router: add missing lines in imports * huawei_router: correctly decode string after router firmware update * Remove things that is done on core now
- Loading branch information
Showing
2 changed files
with
146 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
145 changes: 145 additions & 0 deletions
145
homeassistant/components/device_tracker/huawei_router.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
""" | ||
Support for HUAWEI routers. | ||
For more details about this platform, please refer to the documentation at | ||
https://home-assistant.io/components/device_tracker.huawei/ | ||
""" | ||
import base64 | ||
import logging | ||
import re | ||
from collections import namedtuple | ||
|
||
import requests | ||
import voluptuous as vol | ||
|
||
import homeassistant.helpers.config_validation as cv | ||
from homeassistant.components.device_tracker import ( | ||
DOMAIN, PLATFORM_SCHEMA, DeviceScanner) | ||
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | ||
vol.Required(CONF_HOST): cv.string, | ||
vol.Required(CONF_PASSWORD): cv.string, | ||
vol.Required(CONF_USERNAME): cv.string | ||
}) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def get_scanner(hass, config): | ||
"""Validate the configuration and return a HUAWEI scanner.""" | ||
scanner = HuaweiDeviceScanner(config[DOMAIN]) | ||
|
||
return scanner | ||
|
||
|
||
Device = namedtuple('Device', ['name', 'ip', 'mac', 'state']) | ||
|
||
|
||
class HuaweiDeviceScanner(DeviceScanner): | ||
"""This class queries a router running HUAWEI firmware.""" | ||
|
||
ARRAY_REGEX = re.compile(r'var UserDevinfo = new Array\((.*),null\);') | ||
DEVICE_REGEX = re.compile(r'new USERDevice\((.*?)\),') | ||
DEVICE_ATTR_REGEX = re.compile( | ||
'"(?P<Domain>.*?)","(?P<IpAddr>.*?)",' | ||
'"(?P<MacAddr>.*?)","(?P<Port>.*?)",' | ||
'"(?P<IpType>.*?)","(?P<DevType>.*?)",' | ||
'"(?P<DevStatus>.*?)","(?P<PortType>.*?)",' | ||
'"(?P<Time>.*?)","(?P<HostName>.*?)",' | ||
'"(?P<IPv4Enabled>.*?)","(?P<IPv6Enabled>.*?)",' | ||
'"(?P<DeviceType>.*?)"') | ||
LOGIN_COOKIE = dict(Cookie='body:Language:portuguese:id=-1') | ||
|
||
def __init__(self, config): | ||
"""Initialize the scanner.""" | ||
self.host = config[CONF_HOST] | ||
self.username = config[CONF_USERNAME] | ||
self.password = base64.b64encode(bytes(config[CONF_PASSWORD], 'utf-8')) | ||
|
||
self.last_results = [] | ||
|
||
def scan_devices(self): | ||
"""Scan for new devices and return a list with found device IDs.""" | ||
self._update_info() | ||
return [client.mac for client in self.last_results] | ||
|
||
def get_device_name(self, device): | ||
"""Return the name of the given device or None if we don't know.""" | ||
if not self.last_results: | ||
return None | ||
for client in self.last_results: | ||
if client.mac == device: | ||
return client.name | ||
return None | ||
|
||
def _update_info(self): | ||
"""Ensure the information from the router is up to date. | ||
Return boolean if scanning successful. | ||
""" | ||
data = self._get_data() | ||
if not data: | ||
return False | ||
|
||
active_clients = [client for client in data if client.state] | ||
self.last_results = active_clients | ||
|
||
_LOGGER.debug("Active clients: " + "\n" | ||
.join((client.mac + " " + client.name) | ||
for client in active_clients)) | ||
return True | ||
|
||
def _get_data(self): | ||
"""Get the devices' data from the router. | ||
Returns a list with all the devices known to the router DHCP server. | ||
""" | ||
array_regex_res = self.ARRAY_REGEX.search(self._get_devices_response()) | ||
|
||
devices = [] | ||
if array_regex_res: | ||
device_regex_res = self.DEVICE_REGEX.findall( | ||
array_regex_res.group(1)) | ||
|
||
for device in device_regex_res: | ||
device_attrs_regex_res = self.DEVICE_ATTR_REGEX.search(device) | ||
|
||
devices.append(Device(device_attrs_regex_res.group('HostName'), | ||
device_attrs_regex_res.group('IpAddr'), | ||
device_attrs_regex_res.group('MacAddr'), | ||
device_attrs_regex_res.group( | ||
'DevStatus') == "Online")) | ||
|
||
return devices | ||
|
||
def _get_devices_response(self): | ||
"""Get the raw string with the devices from the router.""" | ||
cnt = requests.post('http://{}/asp/GetRandCount.asp'.format(self.host)) | ||
cnt_str = str(cnt.content, cnt.apparent_encoding, errors='replace') | ||
|
||
_LOGGER.debug("Loggin in") | ||
cookie = requests.post('http://{}/login.cgi'.format(self.host), | ||
data=[('UserName', self.username), | ||
('PassWord', self.password), | ||
('x.X_HW_Token', cnt_str)], | ||
cookies=self.LOGIN_COOKIE) | ||
|
||
_LOGGER.debug("Requesting lan user info update") | ||
# this request is needed or else some devices' state won't be updated | ||
requests.get( | ||
'http://{}/html/bbsp/common/lanuserinfo.asp'.format(self.host), | ||
cookies=cookie.cookies) | ||
|
||
_LOGGER.debug("Requesting lan user info data") | ||
devices = requests.get( | ||
'http://{}/html/bbsp/common/GetLanUserDevInfo.asp'.format( | ||
self.host), | ||
cookies=cookie.cookies) | ||
|
||
# we need to decode() using the request encoding, then encode() and | ||
# decode('unicode_escape') to replace \\xXX with \xXX | ||
# (i.e. \\x2d -> \x2d) | ||
return devices.content.decode(devices.apparent_encoding).encode().\ | ||
decode('unicode_escape') |
d195fd4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello. I am trying this script on a HG8247H router from Romania (a router provided by my internet provider). I guess the firmware is a little bit different (the UI) and there are some problems on the login page (the page differ a little bit). My question is, how can I run it and do some debug on it? This is my first time I am trying to code something on the Home Assistant. I want to make it working with my router firmware version.
d195fd4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For you to make an ideea, the cnt code is injected when you load the login page
<script language="JavaScript" type="text/javascript"> -- | function GetRandCnt() { return '9381c1859c6898efb2ea58f3aaec2670'; }