Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ omit =
homeassistant/components/device_tracker/gpslogger.py
homeassistant/components/device_tracker/huawei_router.py
homeassistant/components/device_tracker/icloud.py
homeassistant/components/device_tracker/keenetic_ndms2.py
homeassistant/components/device_tracker/linksys_ap.py
homeassistant/components/device_tracker/linksys_smart.py
homeassistant/components/device_tracker/luci.py
Expand Down
121 changes: 121 additions & 0 deletions homeassistant/components/device_tracker/keenetic_ndms2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Support for Zyxel Keenetic NDMS2 based routers.

For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.keenetic_ndms2/
"""
import logging
from collections import namedtuple

import requests
import voluptuous as vol

import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME
)

_LOGGER = logging.getLogger(__name__)

# Interface name to track devices for. Most likely one will not need to
# change it from default 'Home'. This is needed not to track Guest WI-FI-
# clients and router itself
CONF_INTERFACE = 'interface'

DEFAULT_INTERFACE = 'Home'


PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_INTERFACE, default=DEFAULT_INTERFACE): cv.string,
})


def get_scanner(_hass, config):
"""Validate the configuration and return a Nmap scanner."""
scanner = KeeneticNDMS2DeviceScanner(config[DOMAIN])

return scanner if scanner.success_init else None


Device = namedtuple('Device', ['mac', 'name'])


class KeeneticNDMS2DeviceScanner(DeviceScanner):
"""This class scans for devices using keenetic NDMS2 web interface."""

def __init__(self, config):
"""Initialize the scanner."""
self.last_results = []

self._url = 'http://%s/rci/show/ip/arp' % config[CONF_HOST]
self._interface = config[CONF_INTERFACE]

self._username = config.get(CONF_USERNAME)
self._password = config.get(CONF_PASSWORD)

self.success_init = self._update_info()
_LOGGER.info("Scanner initialized")

def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()

return [device.mac for device in self.last_results]

def get_device_name(self, mac):
"""Return the name of the given device or None if we don't know."""
filter_named = [device.name for device in self.last_results
if device.mac == mac]

if filter_named:
return filter_named[0]
return None

def _update_info(self):
"""Get ARP from keenetic router."""
_LOGGER.info("Fetching...")

last_results = []

# doing a request
try:
from requests.auth import HTTPDigestAuth
res = requests.get(self._url, timeout=10, auth=HTTPDigestAuth(
self._username, self._password
))
except requests.exceptions.Timeout:
_LOGGER.error(
"Connection to the router timed out at URL %s", self._url)
return False
if res.status_code != 200:
_LOGGER.error(
"Connection failed with http code %s", res.status_code)
return False
try:
result = res.json()
except ValueError:
# If json decoder could not parse the response
_LOGGER.error("Failed to parse response from router")
return False

# parsing response
for info in result:
if info.get('interface') != self._interface:
continue
mac = info.get('mac')
name = info.get('name')
# No address = no item :)
if mac is None:
continue

last_results.append(Device(mac.upper(), name))

self.last_results = last_results

_LOGGER.info("Request successful")
return True