Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 158 additions & 2 deletions homeassistant/components/device_tracker/tplink.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@

def get_scanner(hass, config):
"""Validate the configuration and return a TP-Link scanner."""
for cls in [Tplink5DeviceScanner, Tplink4DeviceScanner,
for cls in [TplinkPowerLineDeviceScanner, TplinkC7v160616DeviceScanner,
Tplink5DeviceScanner, Tplink4DeviceScanner,
Tplink3DeviceScanner, Tplink2DeviceScanner,
TplinkDeviceScanner]:
scanner = cls(config[DOMAIN])
Expand Down Expand Up @@ -143,7 +144,7 @@ def _update_info(self):
self.last_results = {
device['mac_addr'].replace('-', ':'): device['name']
for device in result
}
}
return True

return False
Expand Down Expand Up @@ -417,3 +418,158 @@ def _update_info(self):
return True

return False


class TplinkPowerLineDeviceScanner(TplinkDeviceScanner):
"""This class queries a TP-Link PowerLine device."""

def __init__(self, config):
"""Initialize the scanner."""
self.credentials = ''
self.token = ''
super(TplinkPowerLineDeviceScanner, self).__init__(config)

def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.

Return boolean if scanning successful.
"""
referer = 'http://{}'.format(self.host)
url = 'http://{}/admin/wireless?form=statistics'.format(self.host)
credentials = '{}:{}'.format(self.username, self.password).encode(
'utf')
b64credentials = base64.b64encode(credentials).decode('utf')
cookie_content = requests.compat.quote(' ' + b64credentials)
cookie = {'Authorization': 'Basic{}'.format(cookie_content)}

_LOGGER.info("Loading wireless clients from powerline device...")

try:
page = requests.post(url, cookies=cookie,
headers={'referer': referer},
data={'operation': 'load'})
except requests.ConnectionError:
return False
result = self.parse_macs.findall(page.text)

if not result:
return False

self.last_results = [mac.replace("-", ":") for mac in result]
return True


class TplinkC7v160616DeviceScanner(TplinkDeviceScanner):
"""This class queries an Archer C7 router with TP-Link firmware 160616."""

def __init__(self, config):
"""Initialize the scanner."""
self.credentials = ''
self.token = ''
super(TplinkC7v160616DeviceScanner, self).__init__(config)
# Parent initializer calls _update_info(), which leaves us logged
# in to the router. Do an explicit log out to clear this.
if self.token:
self._log_out()

def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
if self.token:
self._log_out()
return self.last_results

def _get_auth_token(self):
"""Retrieve auth tokens from the router."""
_LOGGER.info("Retrieving auth tokens... for C7 router")
url = 'http://{}/userRpm/LoginRpm.htm?Save=Save'.format(self.host)
referer = 'http://{}'.format(self.host)

# There seems to be some sort of state machine maintained on the
# router that needs a request here first.
requests.get(referer)

# Generate md5 hash of password. The C7 appears to use the first 15
# characters of the password only, so we truncate to remove additional
# characters from being hashed.
password = hashlib.md5(self.password.encode('utf')).hexdigest()
credentials = '{}:{}'.format(self.username, password).encode('utf')

# Encode the credentials to be sent as a cookie.
credentials = base64.b64encode(credentials).decode('utf')
credentials = requests.compat.quote(' ' + credentials)
self.credentials = credentials

# Create the authorization cookie.
cookie = {'Authorization': 'Basic{}'.format(credentials)}

response = requests.get(url, cookies=cookie,
headers={'Referer': referer}, timeout=4)

try:
result = re.search(r'window.parent.location.href = '
r'"https?:\/\/.*\/(.*)\/userRpm\/Index.htm";',
response.text)
if not result:
_LOGGER.error(
"Couldn\'t fetch auth tokens for C7 "
"router %s - unexpected response", self.host)
_LOGGER.error(response.text)
return False
self.token = result.group(1)
return True
except ValueError:
_LOGGER.error(
"Couldn't fetch auth tokens for C7"
" router %s - token not found", self.host)
return False

def _log_out(self):
"""Log out of the router."""
_LOGGER.info('Logging out of C7 router admin interface...')
url = 'http://{}/{}/userRpm/LogoutRpm.htm' \
.format(self.host, self.token)
referer = 'http://{}/userRpm/MenuRpm.htm'.format(self.host)
cookie = {'Authorization': 'Basic{}'.format(self.credentials)}
response = requests.get(url, cookies=cookie,
headers={'Referer': referer}, timeout=10)
if response.status_code != 200:
_LOGGER.error('Failed to log out of C7 Router')
self.token = ''
self.credentials = ''

def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.

Return boolean if scanning successful.
"""
if self.token == '':
got_token = self._get_auth_token()
if not got_token:
return False

_LOGGER.info("Loading wireless clients from C7 Router... %s",
self.host)

mac_results = []

# Check both the 2.4GHz and 5GHz client list URLs.
for clients_url in ('WlanStationRpm.htm', 'WlanStationRpm_5g.htm'):
url = 'http://{}/{}/userRpm/{}' \
.format(self.host, self.token, clients_url)
referer = 'http://{}/userRpm/MenuRpm.htm'.format(self.host)
cookie = {'Authorization': 'Basic{}'.format(self.credentials)}

try:
page = requests.get(url, cookies=cookie,
headers={'Referer': referer}, timeout=10)
except requests.ConnectionError:
return False

mac_results.extend(self.parse_macs.findall(page.text))

if not mac_results:
return False

self.last_results = [mac.replace("-", ":") for mac in mac_results]
return True