Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ homeassistant/components/climate/mill.py @danielhiversen
homeassistant/components/climate/sensibo.py @andrey-git
homeassistant/components/cover/group.py @cdce8p
homeassistant/components/cover/template.py @PhracturedBlue
homeassistant/components/device_tracker/asuswrt.py @kennedyshead
homeassistant/components/device_tracker/automatic.py @armills
homeassistant/components/device_tracker/huawei_router.py @abmantis
homeassistant/components/device_tracker/quantum_gateway.py @cisasteelersfan
Expand Down
338 changes: 22 additions & 316 deletions homeassistant/components/device_tracker/asuswrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
https://home-assistant.io/components/device_tracker.asuswrt/
"""
import logging
import re
import socket
import telnetlib
from collections import namedtuple

import voluptuous as vol

Expand All @@ -19,7 +15,7 @@
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT, CONF_MODE,
CONF_PROTOCOL)

REQUIREMENTS = ['pexpect==4.6.0']
REQUIREMENTS = ['aioasuswrt==1.0.0']

_LOGGER = logging.getLogger(__name__)

Expand All @@ -44,345 +40,55 @@
}))


_LEASES_CMD = 'cat /var/lib/misc/dnsmasq.leases'
_LEASES_REGEX = re.compile(
r'\w+\s' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s' +
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s' +
r'(?P<host>([^\s]+))')

# Command to get both 5GHz and 2.4GHz clients
_WL_CMD = 'for dev in `nvram get wl_ifnames`; do wl -i $dev assoclist; done'
_WL_REGEX = re.compile(
r'\w+\s' +
r'(?P<mac>(([0-9A-F]{2}[:-]){5}([0-9A-F]{2})))')

_IP_NEIGH_CMD = 'ip neigh'
_IP_NEIGH_REGEX = re.compile(
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3}|'
r'([0-9a-fA-F]{1,4}:){1,7}[0-9a-fA-F]{0,4}(:[0-9a-fA-F]{1,4}){1,7})\s'
r'\w+\s'
r'\w+\s'
r'(\w+\s(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2}))))?\s'
r'\s?(router)?'
r'\s?(nud)?'
r'(?P<status>(\w+))')

_ARP_CMD = 'arp -n'
_ARP_REGEX = re.compile(
r'.+\s' +
r'\((?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\)\s' +
r'.+\s' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))' +
r'\s' +
r'.*')


def get_scanner(hass, config):
async def async_get_scanner(hass, config):
"""Validate the configuration and return an ASUS-WRT scanner."""
scanner = AsusWrtDeviceScanner(config[DOMAIN])

await scanner.async_connect()
return scanner if scanner.success_init else None


def _parse_lines(lines, regex):
"""Parse the lines using the given regular expression.

If a line can't be parsed it is logged and skipped in the output.
"""
results = []
for line in lines:
match = regex.search(line)
if not match:
_LOGGER.debug("Could not parse row: %s", line)
continue
results.append(match.groupdict())
return results


Device = namedtuple('Device', ['mac', 'ip', 'name'])


class AsusWrtDeviceScanner(DeviceScanner):
"""This class queries a router running ASUSWRT firmware."""

# Eighth attribute needed for mode (AP mode vs router mode)
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config.get(CONF_PASSWORD, '')
self.ssh_key = config.get('ssh_key', config.get('pub_key', ''))
self.protocol = config[CONF_PROTOCOL]
self.mode = config[CONF_MODE]
self.port = config[CONF_PORT]
self.require_ip = config[CONF_REQUIRE_IP]

if self.protocol == 'ssh':
self.connection = SshConnection(
self.host, self.port, self.username, self.password,
self.ssh_key)
else:
self.connection = TelnetConnection(
self.host, self.port, self.username, self.password)
from aioasuswrt.asuswrt import AsusWrt

self.last_results = {}
self.success_init = False
self.connection = AsusWrt(config[CONF_HOST], config[CONF_PORT],
config[CONF_PROTOCOL] == 'telnet',
config[CONF_USERNAME],
config.get(CONF_PASSWORD, ''),
config.get('ssh_key',
config.get('pub_key', '')),
config[CONF_MODE], config[CONF_REQUIRE_IP])

async def async_connect(self):
"""Initialize connection to the router."""
self.last_results = {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to reset this here? It's already initialized in init.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, that is just sloppy coding

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was left.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Test the router is accessible.
data = self.get_asuswrt_data()
data = await self.connection.async_get_connected_devices()
self.success_init = data is not None

def scan_devices(self):
async def async_scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
await self.async_update_info()
return list(self.last_results.keys())

def get_device_name(self, device):
async def async_get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
if device not in self.last_results:
return None
return self.last_results[device].name

def _update_info(self):
async def async_update_info(self):
"""Ensure the information from the ASUSWRT router is up to date.

Return boolean if scanning successful.
"""
if not self.success_init:
return False

_LOGGER.info('Checking Devices')
data = self.get_asuswrt_data()
if not data:
return False

self.last_results = data
return True

def get_asuswrt_data(self):
"""Retrieve data from ASUSWRT.

Calls various commands on the router and returns the superset of all
responses. Some commands will not work on some routers.
"""
devices = {}
devices.update(self._get_wl())
devices.update(self._get_arp())
devices.update(self._get_neigh(devices))
if not self.mode == 'ap':
devices.update(self._get_leases(devices))

ret_devices = {}
for key in devices:
if not self.require_ip or devices[key].ip is not None:
ret_devices[key] = devices[key]
return ret_devices

def _get_wl(self):
lines = self.connection.run_command(_WL_CMD)
if not lines:
return {}
result = _parse_lines(lines, _WL_REGEX)
devices = {}
for device in result:
mac = device['mac'].upper()
devices[mac] = Device(mac, None, None)
return devices

def _get_leases(self, cur_devices):
lines = self.connection.run_command(_LEASES_CMD)
if not lines:
return {}
lines = [line for line in lines if not line.startswith('duid ')]
result = _parse_lines(lines, _LEASES_REGEX)
devices = {}
for device in result:
# For leases where the client doesn't set a hostname, ensure it
# is blank and not '*', which breaks entity_id down the line.
host = device['host']
if host == '*':
host = ''
mac = device['mac'].upper()
if mac in cur_devices:
devices[mac] = Device(mac, device['ip'], host)
return devices

def _get_neigh(self, cur_devices):
lines = self.connection.run_command(_IP_NEIGH_CMD)
if not lines:
return {}
result = _parse_lines(lines, _IP_NEIGH_REGEX)
devices = {}
for device in result:
status = device['status']
if status is None or status.upper() != 'REACHABLE':
continue
if device['mac'] is not None:
mac = device['mac'].upper()
old_device = cur_devices.get(mac)
old_ip = old_device.ip if old_device else None
devices[mac] = Device(mac, device.get('ip', old_ip), None)
return devices

def _get_arp(self):
lines = self.connection.run_command(_ARP_CMD)
if not lines:
return {}
result = _parse_lines(lines, _ARP_REGEX)
devices = {}
for device in result:
if device['mac'] is not None:
mac = device['mac'].upper()
devices[mac] = Device(mac, device['ip'], None)
return devices


class _Connection:
def __init__(self):
self._connected = False

@property
def connected(self):
"""Return connection state."""
return self._connected

def connect(self):
"""Mark current connection state as connected."""
self._connected = True

def disconnect(self):
"""Mark current connection state as disconnected."""
self._connected = False


class SshConnection(_Connection):
"""Maintains an SSH connection to an ASUS-WRT router."""

def __init__(self, host, port, username, password, ssh_key):
"""Initialize the SSH connection properties."""
super().__init__()

self._ssh = None
self._host = host
self._port = port
self._username = username
self._password = password
self._ssh_key = ssh_key

def run_command(self, command):
"""Run commands through an SSH connection.

Connect to the SSH server if not currently connected, otherwise
use the existing connection.
"""
from pexpect import pxssh, exceptions

try:
if not self.connected:
self.connect()
self._ssh.sendline(command)
self._ssh.prompt()
lines = self._ssh.before.split(b'\n')[1:-1]
return [line.decode('utf-8') for line in lines]
except exceptions.EOF as err:
_LOGGER.error("Connection refused. %s", self._ssh.before)
self.disconnect()
return None
except pxssh.ExceptionPxssh as err:
_LOGGER.error("Unexpected SSH error: %s", err)
self.disconnect()
return None
except AssertionError as err:
_LOGGER.error("Connection to router unavailable: %s", err)
self.disconnect()
return None

def connect(self):
"""Connect to the ASUS-WRT SSH server."""
from pexpect import pxssh

self._ssh = pxssh.pxssh()
if self._ssh_key:
self._ssh.login(self._host, self._username, quiet=False,
ssh_key=self._ssh_key, port=self._port)
else:
self._ssh.login(self._host, self._username, quiet=False,
password=self._password, port=self._port)

super().connect()

def disconnect(self):
"""Disconnect the current SSH connection."""
try:
self._ssh.logout()
except Exception: # pylint: disable=broad-except
pass
finally:
self._ssh = None

super().disconnect()


class TelnetConnection(_Connection):
"""Maintains a Telnet connection to an ASUS-WRT router."""

def __init__(self, host, port, username, password):
"""Initialize the Telnet connection properties."""
super().__init__()

self._telnet = None
self._host = host
self._port = port
self._username = username
self._password = password
self._prompt_string = None

def run_command(self, command):
"""Run a command through a Telnet connection.

Connect to the Telnet server if not currently connected, otherwise
use the existing connection.
"""
try:
if not self.connected:
self.connect()
self._telnet.write('{}\n'.format(command).encode('ascii'))
data = (self._telnet.read_until(self._prompt_string).
split(b'\n')[1:-1])
return [line.decode('utf-8') for line in data]
except EOFError:
_LOGGER.error("Unexpected response from router")
self.disconnect()
return None
except ConnectionRefusedError:
_LOGGER.error("Connection refused by router. Telnet enabled?")
self.disconnect()
return None
except socket.gaierror as exc:
_LOGGER.error("Socket exception: %s", exc)
self.disconnect()
return None
except OSError as exc:
_LOGGER.error("OSError: %s", exc)
self.disconnect()
return None

def connect(self):
"""Connect to the ASUS-WRT Telnet server."""
self._telnet = telnetlib.Telnet(self._host)
self._telnet.read_until(b'login: ')
self._telnet.write((self._username + '\n').encode('ascii'))
self._telnet.read_until(b'Password: ')
self._telnet.write((self._password + '\n').encode('ascii'))
self._prompt_string = self._telnet.read_until(b'#').split(b'\n')[-1]

super().connect()

def disconnect(self):
"""Disconnect the current Telnet connection."""
try:
self._telnet.write('exit\n'.encode('ascii'))
except Exception: # pylint: disable=broad-except
pass

super().disconnect()
self.last_results = await self.connection.async_get_connected_devices()
Loading