diff --git a/homeassistant/components/device_tracker/tplink.py b/homeassistant/components/device_tracker/tplink.py index 6c5fb697c072da..ccdd1ede3e11dd 100644 --- a/homeassistant/components/device_tracker/tplink.py +++ b/homeassistant/components/device_tracker/tplink.py @@ -35,7 +35,8 @@ def get_scanner(hass, config): """Validate the configuration and return a TP-Link scanner.""" - for cls in [Tplink5DeviceScanner, Tplink4DeviceScanner, + for cls in [TplinkPowerLineDeviceScanner, TplinkC7v160616DeviceScanner, + Tplink5DeviceScanner, Tplink4DeviceScanner, Tplink3DeviceScanner, Tplink2DeviceScanner, TplinkDeviceScanner]: scanner = cls(config[DOMAIN]) @@ -143,7 +144,7 @@ def _update_info(self): self.last_results = { device['mac_addr'].replace('-', ':'): device['name'] for device in result - } + } return True return False @@ -417,3 +418,158 @@ def _update_info(self): return True return False + + +class TplinkPowerLineDeviceScanner(TplinkDeviceScanner): + """This class queries a TP-Link PowerLine device.""" + + def __init__(self, config): + """Initialize the scanner.""" + self.credentials = '' + self.token = '' + super(TplinkPowerLineDeviceScanner, self).__init__(config) + + def _update_info(self): + """Ensure the information from the TP-Link router is up to date. + + Return boolean if scanning successful. + """ + referer = 'http://{}'.format(self.host) + url = 'http://{}/admin/wireless?form=statistics'.format(self.host) + credentials = '{}:{}'.format(self.username, self.password).encode( + 'utf') + b64credentials = base64.b64encode(credentials).decode('utf') + cookie_content = requests.compat.quote(' ' + b64credentials) + cookie = {'Authorization': 'Basic{}'.format(cookie_content)} + + _LOGGER.info("Loading wireless clients from powerline device...") + + try: + page = requests.post(url, cookies=cookie, + headers={'referer': referer}, + data={'operation': 'load'}) + except requests.ConnectionError: + return False + result = self.parse_macs.findall(page.text) + + if not result: + return False + + self.last_results = [mac.replace("-", ":") for mac in result] + return True + + +class TplinkC7v160616DeviceScanner(TplinkDeviceScanner): + """This class queries an Archer C7 router with TP-Link firmware 160616.""" + + def __init__(self, config): + """Initialize the scanner.""" + self.credentials = '' + self.token = '' + super(TplinkC7v160616DeviceScanner, self).__init__(config) + # Parent initializer calls _update_info(), which leaves us logged + # in to the router. Do an explicit log out to clear this. + if self.token: + self._log_out() + + def scan_devices(self): + """Scan for new devices and return a list with found device IDs.""" + self._update_info() + if self.token: + self._log_out() + return self.last_results + + def _get_auth_token(self): + """Retrieve auth tokens from the router.""" + _LOGGER.info("Retrieving auth tokens... for C7 router") + url = 'http://{}/userRpm/LoginRpm.htm?Save=Save'.format(self.host) + referer = 'http://{}'.format(self.host) + + # There seems to be some sort of state machine maintained on the + # router that needs a request here first. + requests.get(referer) + + # Generate md5 hash of password. The C7 appears to use the first 15 + # characters of the password only, so we truncate to remove additional + # characters from being hashed. + password = hashlib.md5(self.password.encode('utf')).hexdigest() + credentials = '{}:{}'.format(self.username, password).encode('utf') + + # Encode the credentials to be sent as a cookie. + credentials = base64.b64encode(credentials).decode('utf') + credentials = requests.compat.quote(' ' + credentials) + self.credentials = credentials + + # Create the authorization cookie. + cookie = {'Authorization': 'Basic{}'.format(credentials)} + + response = requests.get(url, cookies=cookie, + headers={'Referer': referer}, timeout=4) + + try: + result = re.search(r'window.parent.location.href = ' + r'"https?:\/\/.*\/(.*)\/userRpm\/Index.htm";', + response.text) + if not result: + _LOGGER.error( + "Couldn\'t fetch auth tokens for C7 " + "router %s - unexpected response", self.host) + _LOGGER.error(response.text) + return False + self.token = result.group(1) + return True + except ValueError: + _LOGGER.error( + "Couldn't fetch auth tokens for C7" + " router %s - token not found", self.host) + return False + + def _log_out(self): + """Log out of the router.""" + _LOGGER.info('Logging out of C7 router admin interface...') + url = 'http://{}/{}/userRpm/LogoutRpm.htm' \ + .format(self.host, self.token) + referer = 'http://{}/userRpm/MenuRpm.htm'.format(self.host) + cookie = {'Authorization': 'Basic{}'.format(self.credentials)} + response = requests.get(url, cookies=cookie, + headers={'Referer': referer}, timeout=10) + if response.status_code != 200: + _LOGGER.error('Failed to log out of C7 Router') + self.token = '' + self.credentials = '' + + def _update_info(self): + """Ensure the information from the TP-Link router is up to date. + + Return boolean if scanning successful. + """ + if self.token == '': + got_token = self._get_auth_token() + if not got_token: + return False + + _LOGGER.info("Loading wireless clients from C7 Router... %s", + self.host) + + mac_results = [] + + # Check both the 2.4GHz and 5GHz client list URLs. + for clients_url in ('WlanStationRpm.htm', 'WlanStationRpm_5g.htm'): + url = 'http://{}/{}/userRpm/{}' \ + .format(self.host, self.token, clients_url) + referer = 'http://{}/userRpm/MenuRpm.htm'.format(self.host) + cookie = {'Authorization': 'Basic{}'.format(self.credentials)} + + try: + page = requests.get(url, cookies=cookie, + headers={'Referer': referer}, timeout=10) + except requests.ConnectionError: + return False + + mac_results.extend(self.parse_macs.findall(page.text)) + + if not mac_results: + return False + + self.last_results = [mac.replace("-", ":") for mac in mac_results] + return True