diff --git a/pynetgear/__init__.py b/pynetgear/__init__.py index 4d169f1..df0e63a 100644 --- a/pynetgear/__init__.py +++ b/pynetgear/__init__.py @@ -1,773 +1,5 @@ """Module to communicate with Netgear routers using the SOAP v2 API.""" -from __future__ import print_function +# flake8: noqa -from io import StringIO -from collections import namedtuple -import logging -import xml.etree.ElementTree as ET -from datetime import timedelta -import re -import sys - -import requests -from urllib3 import disable_warnings -from urllib3.exceptions import InsecureRequestWarning - -disable_warnings(InsecureRequestWarning) - -# define regex to filter invalid XML codes -# cf https://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python -if sys.version_info[0] == 3: - unichr = chr -_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), - (0x7F, 0x84), (0x86, 0x9F), - (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] -if sys.maxunicode >= 0x10000: # not narrow build - _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), - (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), - (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), - (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), - (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), - (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), - (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), - (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)]) - -_illegal_ranges = ["%s-%s" % (unichr(low), unichr(high)) - for (low, high) in _illegal_unichrs] -_illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(_illegal_ranges)) - - -DEFAULT_HOST = 'routerlogin.net' -DEFAULT_USER = 'admin' -DEFAULT_PORT = 5000 -ALL_PORTS = [(5555, True), (443, True), (5000, False), (80, False)] -_LOGGER = logging.getLogger(__name__) - - -BLOCK = "Block" -ALLOW = "Allow" - -Device = namedtuple( - "Device", [ - "name", - "ip", - "mac", - "type", - "signal", - "link_rate", - "allow_or_block", - "device_type", - "device_model", - "ssid", - "conn_ap_mac" - ] -) - - -class Netgear(object): - """Represents a session to a Netgear Router.""" - - def __init__( - self, - password=None, - host=None, - user=None, - port=None, - ssl=False, - url=None, - force_login_v1=False, - force_login_v2=False, - ): - """Initialize a Netgear session.""" - if not url and not host and not port: - url = autodetect_url() - - if not host: - host = DEFAULT_HOST - if not port: - port = DEFAULT_PORT - if not user: - user = DEFAULT_USER - - self.username = user - self.password = password - self.url = url - self.host = host - self.port = port - self.ssl = ssl - self.force_login_v1 = force_login_v1 - self.force_login_v2 = force_login_v2 - self.cookie = None - self.config_started = False - self._logging_in = False - self._login_version = 2 - - self._info = None - - @property - def soap_url(self): - """SOAP url to connect to the router.""" - if self.url: - return self.url + "/soap/server_sa/" - - scheme = "https" if self.ssl else "http" - return "{}://{}:{}/soap/server_sa/".format( - scheme, self.host, self.port) - - def login_try_port(self): - # first try the currently configured port-ssl combination - current_port = (self.port, self.ssl) - if self.login(): - return True - - ports = ALL_PORTS.copy() - if current_port in ports: - ports.remove(current_port) - - for port in ports: - self.port = port[0] - self.ssl = port[1] - if self.login(): - _LOGGER.info("Login succeeded using non default port " - "'%i' and ssl '%r'.", self.port, self.ssl) - return True - - # reset original port-ssl - self.port = current_port[0] - self.ssl = current_port[1] - _LOGGER.error("login using all known port-ssl combinations failed.") - return False - - def login(self): - """ - Login to the router. - - Will be called automatically by other actions. - """ - if self._logging_in: - _LOGGER.debug("Login re-attempt within the login, ignoring.") - return False - self._logging_in = True - - # cookie is also used to track if at least - # one login attempt has been made for v1 - self.cookie = None - - # if a force option is given always start with that method - if self.force_login_v1: - self._login_version = 1 - if self.force_login_v2: - self._login_version = 2 - - login_methods = [self.login_v1, self.login_v2] - for idx in range(0, len(login_methods)): - login_version = (idx + self._login_version) % len(login_methods) - login_method = login_methods[login_version-1] - if login_method(): - # login succeeded, next time start with this login method - self._logging_in = False - self._login_version = login_version - return True - - # login failed, next time start trying with the other login method - self._logging_in = False - self._login_version = self._login_version + 1 - return False - - def login_v2(self): - _LOGGER.debug("Login v2, port '%i', ssl, '%r'", self.port, self.ssl) - - success, response = self._make_request( - SERVICE_DEVICE_CONFIG, - "SOAPLogin", - { - "Username": self.username, - "Password": self.password - }, - None, - False - ) - - if not success: - return False - - if 'Set-Cookie' in response.headers: - self.cookie = response.headers['Set-Cookie'] - else: - _LOGGER.error("Login v2 ok but no cookie...") - _LOGGER.debug(response.headers) - return False - - return True - - def login_v1(self): - _LOGGER.debug("Login v1, port '%i', ssl, '%r'", self.port, self.ssl) - - body = LOGIN_V1_BODY.format(username=self.username, - password=self.password) - - success, _ = self._make_request( - "ParentalControl:1", - "Authenticate", - None, - body, - False - ) - - self.cookie = success - - # check login succes with info call - if self.get_info(use_cache=False) is None: - return False - - return True - - def get_info(self, use_cache=True): - """ - Return router informations, like: - - ModelName - - DeviceName - - SerialNumber - - Firmwareversion - - FirewallVersion - - Hardwareversion - - FirmwareLastUpdate - - FirmwareLastChecked - - Returns None if error occurred. - """ - _LOGGER.debug("Get Info") - - if self._info is not None and use_cache: - _LOGGER.debug("Info from cache.") - return self._info - - success, response = self._make_request( - SERVICE_DEVICE_INFO, - "GetInfo" - ) - if not success: - return None - - success, node = _find_node( - response.text, - ".//GetInfoResponse") - if not success: - return None - - self._info = {t.tag: t.text for t in node} - - return self._info - - def get_attached_devices(self): - """ - Return list of connected devices to the router. - - Returns None if error occurred. - """ - _LOGGER.debug("Get attached devices") - - success, response = self._make_request( - SERVICE_DEVICE_INFO, - "GetAttachDevice" - ) - - if not success: - _LOGGER.error("Get attached devices failed") - return None - - success, node = _find_node( - response.text, - ".//GetAttachDeviceResponse/NewAttachDevice") - if not success: - return None - - devices = [] - - # Netgear inserts a double-encoded value for "unknown" devices - decoded = node.text.strip().replace(UNKNOWN_DEVICE_ENCODED, - UNKNOWN_DEVICE_DECODED) - - if not decoded or decoded == "0": - _LOGGER.info("Can't parse attached devices string") - return devices - - entries = decoded.split("@") - - # First element is the total device count - entry_count = None - if len(entries) > 1: - entry_count = _convert(entries.pop(0), int) - - if entry_count is not None and entry_count != len(entries): - _LOGGER.info( - "Number of devices should be: %d but is: %d", - entry_count, - len(entries) - ) - - for entry in entries: - info = entry.split(";") - - if len(info) == 0: - continue - - # Not all routers will report those - signal = None - link_type = None - link_rate = None - allow_or_block = None - mac = None - name = None - - if len(info) >= 8: - allow_or_block = info[7] - if len(info) >= 7: - link_type = info[4] - link_rate = _convert(info[5], int) - signal = _convert(info[6], int) - if len(info) >= 4: - mac = info[3] - if len(info) >= 3: - name = info[2] - - if len(info) < 2: - _LOGGER.warning("Unexpected entry: %s", info) - continue - - ipv4 = info[1] - - devices.append(Device(name, ipv4, mac, - link_type, signal, link_rate, allow_or_block, - None, None, None, None)) - - return devices - - def get_attached_devices_2(self): - """ - Return list of connected devices to the router with details. - - This call is slower and probably heavier on the router load. - - Returns None if error occurred. - """ - _LOGGER.debug("Get attached devices 2") - - success, response = self._make_request( - SERVICE_DEVICE_INFO, - "GetAttachDevice2" - ) - if not success: - return None - - success, devices_node = _find_node( - response.text, - ".//GetAttachDevice2Response/NewAttachDevice") - if not success: - return None - - xml_devices = devices_node.findall("Device") - devices = [] - for d in xml_devices: - ip = _xml_get(d, 'IP') - name = _xml_get(d, 'Name') - mac = _xml_get(d, 'MAC') - signal = _convert(_xml_get(d, 'SignalStrength'), int) - link_type = _xml_get(d, 'ConnectionType') - link_rate = _xml_get(d, 'Linkspeed') - allow_or_block = _xml_get(d, 'AllowOrBlock') - device_type = _convert(_xml_get(d, 'DeviceType'), int) - device_model = _xml_get(d, 'DeviceModel') - ssid = _xml_get(d, 'SSID') - conn_ap_mac = _xml_get(d, 'ConnAPMAC') - devices.append( - Device( - name, - ip, - mac, - link_type, - signal, - link_rate, - allow_or_block, - device_type, - device_model, - ssid, - conn_ap_mac - ) - ) - - return devices - - def get_traffic_meter(self): - """ - Return dict of traffic meter stats. - - Returns None if error occurred. - """ - _LOGGER.debug("Get traffic meter") - - def parse_text(text): - """ - there are three kinds of values in the returned data - This function parses the different values and returns - (total, avg), timedelta or a plain float - """ - def tofloats(lst): return (float(t) for t in lst) - try: - text = text.replace(',', '') # 25,350.10 MB - if "--" in text: - return None - if "/" in text: # "6.19/0.88" total/avg - return tuple(tofloats(text.split('/'))) - if ":" in text: # 11:14 hr:mn - hour, mins = tofloats(text.split(':')) - return timedelta(hours=hour, minutes=mins) - return float(text) - except ValueError: - _LOGGER.error("Error parsing traffic meter stats: %s", text) - return None - - success, response = self._make_request( - SERVICE_DEVICE_CONFIG, - "GetTrafficMeterStatistics" - ) - if not success: - return None - - success, node = _find_node( - response.text, - ".//GetTrafficMeterStatisticsResponse") - if not success: - return None - - return {t.tag: parse_text(t.text) for t in node} - - def config_start(self): - """ - Start a configuration session. - For managing router admin functionality (ie allowing/blocking devices) - """ - _LOGGER.debug("Config start") - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, - "ConfigurationStarted", - { - "NewSessionID": SESSION_ID - } - ) - - self.config_started = success - return success - - def config_finish(self): - """ - End of a configuration session. - Tells the router we're done managing admin functionality. - """ - _LOGGER.debug("Config finish") - if not self.config_started: - return True - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, - "ConfigurationFinished", - { - "NewStatus": "ChangesApplied" - } - ) - - self.config_started = not success - return success - - def allow_block_device(self, mac_addr, device_status=BLOCK): - """ - Allow or Block a device via its Mac Address. - Pass in the mac address for the device that you want to set. Pass in the - device_status you wish to set the device to: Allow (allow device to access the - network) or Block (block the device from accessing the network). - """ - _LOGGER.debug("Allow block device") - if self.config_started: - _LOGGER.error("Inconsistant configuration state, configuration already started") - if not self.config_finish(): - return False - - if not self.config_start(): - _LOGGER.error("Could not start configuration") - return False - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, - "SetBlockDeviceByMAC", - { - "NewAllowOrBlock": device_status, - "NewMACAddress": mac_addr - } - ) - - if not success: - _LOGGER.error("Could not successfully call allow/block device") - return False - - if not self.config_finish(): - _LOGGER.error("Inconsistant configuration state, configuration already finished") - return False - - return True - - def _get_headers(self, service, method, need_auth=True): - headers = _get_soap_headers(service, method) - # if the stored cookie is not a str then we are - # probably using the old login method - if need_auth and isinstance(self.cookie, str): - headers["Cookie"] = self.cookie - return headers - - def reboot(self): - _LOGGER.debug("reboot") - if self.config_started: - _LOGGER.error("Inconsistant configuration state, configuration already started") - if not self.config_finish(): - return False - - if not self.config_start(): - _LOGGER.error("Could not start configuration") - return False - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, - "Reboot", - ) - - if not success: - _LOGGER.error("Could not successfully call reboot") - return False - - self.config_started = False - - return True - - def _post_request(self, headers, message): - """Post the API request to the router.""" - return requests.post( - self.soap_url, headers=headers, data=message, - timeout=30, verify=False - ) - - def _make_request( - self, - service, - method, - params=None, - body="", - need_auth=True - ): - """Make an API request to the router.""" - # If we have no cookie (v2) or never called login before (v1) - # and we need auth, the request will fail for sure. - if need_auth and not self.cookie: - if not self.login(): - return False, None - - headers = self._get_headers(service, method, need_auth) - - if not body: - if not params: - params = "" - if isinstance(params, dict): - _map = params - params = "" - for k in _map: - params += "<" + k + ">" + _map[k] + "\n" - - body = CALL_BODY.format(service=SERVICE_PREFIX + service, - method=method, params=params) - - message = SOAP_REQUEST.format(session_id=SESSION_ID, body=body) - - try: - try: - response = self._post_request(headers, message) - except requests.exceptions.SSLError: - _LOGGER.debug("SSL error, thread as unauthorized response " - "and try again after re-login") - response = requests.Response() - response.status_code = 401 - - if need_auth and _is_unauthorized_response(response): - # let's discard the cookie because it probably expired (v2) - # or the IP-bound session expired (v1) - self.cookie = None - - _LOGGER.debug("Unauthorized response, " - "let's login and retry...") - if not self.login(): - _LOGGER.error("Unauthorized response, re-login failed") - return False, response - - # reset headers with new cookie first and re-try - headers = self._get_headers(service, method, need_auth) - response = self._post_request(headers, message) - - success = _is_valid_response(response) - if not success and not self._logging_in: - if _is_unauthorized_response(response): - _LOGGER.error("Unauthorized response, " - "after seemingly successful re-login") - elif _is_service_unavailable_response(response): - # try the request one more time - response = self._post_request(headers, message) - success = _is_valid_response(response) - if not success: - _LOGGER.error("503 Service Unavailable after retry, " - "the API may be overloaded.") - else: - _LOGGER.error("Invalid response: %s\n%s\n%s", - response.status_code, str(response.headers), - response.text) - - return success, response - - except requests.exceptions.RequestException: - _LOGGER.exception("Error talking to API") - self.cookie = None - - # Maybe one day we will distinguish between - # different errors.. - return False, None - - -def autodetect_url(): - """ - Try to autodetect the base URL of the router SOAP service. - - Returns None if it can't be found. - """ - DETECTABLE_URLS = [ - "http://routerlogin.net", - "http://routerlogin.net:5000", - "https://routerlogin.net", - ] - for url in DETECTABLE_URLS: - try: - resp = requests.get( - url + "/soap/server_sa/", - headers=_get_soap_headers("Test:1", "test"), - verify=False, - timeout=1 - ) - if resp.status_code == 200: - return url - except requests.exceptions.RequestException: - pass - - return None - - -def _find_node(text, xpath): - text = _illegal_xml_chars_RE.sub('', text) - - try: - it = ET.iterparse(StringIO(text)) - # strip all namespaces - for _, el in it: - if '}' in el.tag: - el.tag = el.tag.split('}', 1)[1] - except ET.ParseError: - _LOGGER.error("Error parsing XML response") - _LOGGER.debug("Error parsing XML response", exc_info=True) - _LOGGER.debug(text) - return False, None - - node = it.root.find(xpath) - if node is None: - _LOGGER.error("Error finding node in XML response") - _LOGGER.debug(text) - return False, None - - return True, node - - -def _xml_get(e, name): - """ - Returns the value of the subnode "name" of element e. - - Returns None if the subnode doesn't exist - """ - r = e.find(name) - if r is not None: - return r.text - return None - - -def _get_soap_headers(service, method): - action = SERVICE_PREFIX + service + "#" + method - return { - "SOAPAction": action, - "Cache-Control": "no-cache", - "User-Agent": "pynetgear", - "Content-Type": "multipart/form-data" - } - - -def _is_valid_response(resp): - return (resp.status_code == 200 and - ("0000000401" in resp.text) - - -def _is_service_unavailable_response(resp): - return (resp.status_code == 503 or - "503" in resp.text) - - -def _convert(value, to_type, default=None): - """Convert value to to_type, returns default if fails.""" - try: - return default if value is None else to_type(value) - except ValueError: - # If value could not be converted - return default - - -SERVICE_PREFIX = "urn:NETGEAR-ROUTER:service:" -SERVICE_DEVICE_INFO = "DeviceInfo:1" -SERVICE_DEVICE_CONFIG = "DeviceConfig:1" - -REGEX_ATTACHED_DEVICES = r"(.*)" - -# Until we know how to generate it, give the one we captured -SESSION_ID = "A7D88AE69687E58D9A00" - -SOAP_REQUEST = """ - - -{session_id} - -{body} - -""" - -LOGIN_V1_BODY = """ - - {username} - {password} - -""" - -CALL_BODY = """ - -{params} -""" - -UNKNOWN_DEVICE_DECODED = '' -UNKNOWN_DEVICE_ENCODED = '<unknown>' +from .router import Device, Netgear +from .const import ALLOW, BLOCK, DEFAULT_HOST, DEFAULT_USER, DEFAULT_PORT diff --git a/pynetgear/__main__.py b/pynetgear/__main__.py index 55ece3f..580ddca 100644 --- a/pynetgear/__main__.py +++ b/pynetgear/__main__.py @@ -12,11 +12,23 @@ def make_formatter(format_name): if "json" in format_name: from json import dumps import datetime - def jsonhandler(obj): obj.isoformat() if isinstance(obj, (datetime.datetime, datetime.date)) else obj + + def jsonhandler(obj): + obj.isoformat() if isinstance( + obj, (datetime.datetime, datetime.date) + ) else obj + if format_name == "prettyjson": - def jsondumps(data): return dumps(data, default=jsonhandler, indent=2, separators=(',', ': ')) + + def jsondumps(data): + return dumps( + data, default=jsonhandler, indent=2, separators=(",", ": ") + ) + else: - def jsondumps(data): return dumps(data, default=jsonhandler) + + def jsondumps(data): + return dumps(data, default=jsonhandler) def jsonify(data): if isinstance(data, dict): @@ -24,64 +36,84 @@ def jsonify(data): elif isinstance(data, list): print(jsondumps([device._asdict() for device in data])) else: - print(dumps({'result': data})) + print(dumps({"result": data})) + return jsonify else: + def printer(data): if isinstance(data, dict): print(data) else: for row in data: print(row) + return printer def argparser(): """Constructs the ArgumentParser for the CLI""" - parser = ArgumentParser(prog='pynetgear') + parser = ArgumentParser(prog="pynetgear") - parser.add_argument("--format", choices=['json', 'prettyjson', 'py'], default='prettyjson') + parser.add_argument( + "--format", choices=["json", "prettyjson", "py"], default="prettyjson" + ) router_args = parser.add_argument_group("router connection config") router_args.add_argument("--host", help="Hostname for the router") router_args.add_argument("--user", help="Account for login") router_args.add_argument("--port", help="Port exposed on the router") - router_args.add_argument("--login-v2", help="Force the use of the cookie-based authentication", - dest="force_login_v2", default=False, action="store_true") router_args.add_argument( - "--password", - help="Not required with a wired connection." + - "Optionally, set the PYNETGEAR_PASSWORD environment variable") + "--login-v2", + help="Force the use of the cookie-based authentication", + dest="force_login_v2", + default=False, + action="store_true", + ) + router_args.add_argument( + "--password", + help="Not required with a wired connection." + + "Optionally, set the PYNETGEAR_PASSWORD environment variable", + ) router_args.add_argument( - "--url", help="Overrides host:port and ssl with url to router") - router_args.add_argument("--no-ssl", - dest="ssl", default=True, - action="store_false", - help="Connect with https") + "--url", help="Overrides host:port and ssl with url to router" + ) + router_args.add_argument( + "--no-ssl", + dest="ssl", + default=True, + action="store_false", + help="Connect with https", + ) subparsers = parser.add_subparsers( - description="Runs subcommand against the specified router", - dest="subcommand") + description="Runs subcommand against the specified router", + dest="subcommand", + ) block_parser = subparsers.add_parser( - "block_device", - help="Blocks a device from connecting by mac address") + "block_device", help="Blocks a device from connecting by mac address" + ) block_parser.add_argument("--mac-addr") allow_parser = subparsers.add_parser( - "allow_device", - help="Allows a device with the mac address to connect") + "allow_device", help="Allows a device with the mac address to connect" + ) allow_parser.add_argument("--mac-addr") subparsers.add_parser("login", help="Attempts to login to router.") - attached_devices = subparsers.add_parser("attached_devices", help="Outputs all attached devices") + attached_devices = subparsers.add_parser( + "attached_devices", help="Outputs all attached devices" + ) attached_devices.add_argument( - "-v", "--verbose", - action="store_true", - default=False, - help="Choose between verbose and slower or terse and fast.") + "-v", + "--verbose", + action="store_true", + default=False, + help="Choose between verbose and slower or terse and fast.", + ) subparsers.add_parser("traffic_meter", help="Output router's traffic meter data") @@ -94,7 +126,9 @@ def run_subcommand(netgear, args): subcommand = args.subcommand if subcommand == "block_device" or subcommand == "allow_device": - return netgear.allow_block_device(args.mac_addr, BLOCK if subcommand == "block_device" else ALLOW) + return netgear.allow_block_device( + args.mac_addr, BLOCK if subcommand == "block_device" else ALLOW + ) if subcommand == "attached_devices": if args.verbose: @@ -102,10 +136,10 @@ def run_subcommand(netgear, args): else: return netgear.get_attached_devices() - if subcommand == 'traffic_meter': + if subcommand == "traffic_meter": return netgear.get_traffic_meter() - if subcommand == 'login': + if subcommand == "login": return netgear.login() print("Unknown subcommand") @@ -115,9 +149,17 @@ def main(): """Scan for devices and print results.""" args = argparser().parse_args(sys.argv[1:]) - password = os.environ.get('PYNETGEAR_PASSWORD') or args.password - - netgear = Netgear(password, args.host, args.user, args.port, args.ssl, args.url, args.force_login_v2) + password = os.environ.get("PYNETGEAR_PASSWORD") or args.password + + netgear = Netgear( + password, + args.host, + args.user, + args.port, + args.ssl, + args.url, + args.force_login_v2, + ) results = run_subcommand(netgear, args) formatter = make_formatter(args.format) @@ -129,5 +171,5 @@ def main(): formatter(results) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/pynetgear/const.py b/pynetgear/const.py new file mode 100644 index 0000000..4cf1fff --- /dev/null +++ b/pynetgear/const.py @@ -0,0 +1,148 @@ +# encoding: utf-8 +"""Constants for pynetgear.""" + +# --------------------- +# DEFAULTS +# --------------------- +DEFAULT_HOST = "routerlogin.net" +DEFAULT_USER = "admin" +DEFAULT_PORT = 5000 +ALL_PORTS = [(5555, True), (443, True), (5000, False), (80, False)] + +BLOCK = "Block" +ALLOW = "Allow" + +UNKNOWN_DEVICE_DECODED = "" +UNKNOWN_DEVICE_ENCODED = "<unknown>" + +# Until we know how to generate it, give the one we captured +SESSION_ID = "A7D88AE69687E58D9A00" + +# --------------------- +# SERVICE +# --------------------- +SERVICE_PREFIX = "urn:NETGEAR-ROUTER:service:" +SERVICE_DEVICE_INFO = "DeviceInfo:1" +SERVICE_DEVICE_CONFIG = "DeviceConfig:1" +SERVICE_LAN_CONFIG_SECURITY = "LANConfigSecurity:1" +SERVICE_WAN_IP_CONNECTION = "WANIPConnection:1" +SERVICE_PARENTAL_CONTROL = "ParentalControl:1" +SERVICE_ADVANCED_QOS = "AdvancedQoS:1" +SERVICE_WLAN_CONFIGURATION = "WLANConfiguration:1" +SERVICE_USER_OPTIONS_TC = "UserOptionsTC:1" + +# --------------------- +# SERVICE_DEVICE_CONFIG +# --------------------- +LOGIN = "SOAPLogin" +# LOGOUT = 'SOAPLogout' +REBOOT = "Reboot" +CHECK_NEW_FIRMWARE = "CheckNewFirmware" +# UPDATE_NEW_FIRMWARE = 'UpdateNewFirmware' +CHECK_APP_NEW_FIRMWARE = "CheckAppNewFirmware" +CONFIGURATION_STARTED = "ConfigurationStarted" +CONFIGURATION_FINISHED = "ConfigurationFinished" + +# BLOCK/ALLOW DEVICE +SET_BLOCK_DEVICE_ENABLE = "SetBlockDeviceEnable" +GET_BLOCK_DEVICE_ENABLE_STATUS = "GetBlockDeviceEnableStatus" +# ENABLE_BLOCK_DEVICE_FOR_ALL = 'EnableBlockDeviceForAll' # deprecated? +SET_BLOCK_DEVICE_BY_MAC = "SetBlockDeviceByMAC" + +# TRAFFIC METER +GET_TRAFFIC_METER_STATISTICS = "GetTrafficMeterStatistics" +ENABLE_TRAFFIC_METER = "EnableTrafficMeter" +GET_TRAFFIC_METER_ENABLED = "GetTrafficMeterEnabled" +# SET_TRAFFIC_METER_OPTIONS = 'SetTrafficMeterOptions' +GET_TRAFFIC_METER_OPTIONS = "GetTrafficMeterOptions" + + +# --------------------- +# SERVICE_PARENTAL_CONTROL +# --------------------- +LOGIN_OLD = "Authenticate" +ENABLE_PARENTAL_CONTROL = "EnableParentalControl" +GET_PARENTAL_CONTROL_ENABLE_STATUS = "GetEnableStatus" +GET_ALL_MAC_ADDRESSES = "GetAllMACAddresses" +# SET_DNS_MASQ_DEVICE_ID = 'SetDNSMasqDeviceID' +GET_DNS_MASQ_DEVICE_ID = "GetDNSMasqDeviceID" +# DELETE_MAC_ADDRESS = 'DeleteMACAddress' + +# --------------------- +# SERVICE_DEVICE_INFO +# --------------------- +GET_SUPPORT_FEATURE_LIST_XML = "GetSupportFeatureListXML" +GET_ATTACHED_DEVICES = "GetAttachDevice" +GET_ATTACHED_DEVICES_2 = "GetAttachDevice2" +# SET_DEVICE_NAME_ICON_BY_MAC = 'SetDeviceNameIconByMAC' +# SET_DEVICE_NAME = 'SetNetgearDeviceName' + +# --------------------- +# SERVICE_ADVANCED_QOS +# --------------------- +SET_SPEED_TEST_START = "SetOOKLASpeedTestStart" +GET_SPEED_TEST_RESULT = "GetOOKLASpeedTestResult" +SET_QOS_ENABLE_STATUS = "SetQoSEnableStatus" +GET_QOS_ENABLE_STATUS = "GetQoSEnableStatus" +# SET_BANDWIDTH_CONTROL_OPTIONS = 'SetBandwidthControlOptions' +GET_BANDWIDTH_CONTROL_OPTIONS = "GetBandwidthControlOptions" +GET_CURRENT_APP_BANDWIDTH = "GetCurrentAppBandwidth" # Not Working +GET_CURRENT_DEVICE_BANDWIDTH = "GetCurrentDeviceBandwidth" # Not Working +GET_CURRENT_APP_BANDWIDTH_BY_MAC = "GetCurrentAppBandwidthByMAC" # Not Working + +# --------------------- +# SERVICE_WLAN_CONFIGURATION +# --------------------- +SET_GUEST_ACCESS_ENABLED = "SetGuestAccessEnabled" # 2.4G-1 R7800 +GET_GUEST_ACCESS_ENABLED = "GetGuestAccessEnabled" # 2.4G-1 R7800/R8000 +SET_GUEST_ACCESS_ENABLED_2 = "SetGuestAccessEnabled2" # 2.4G-1 R8000 +GET_GUEST_ACCESS_ENABLED_2 = "GetGuestAccessEnabled2" # 2.4G-1 R8000 +SET_5G_GUEST_ACCESS_ENABLED = "Set5GGuestAccessEnabled" # 5G-1 R7800 +GET_5G1_GUEST_ACCESS_ENABLED = "Get5GGuestAccessEnabled" # 5G-1 R7800 +GET_5G1_GUEST_ACCESS_ENABLED_2 = "Get5G1GuestAccessEnabled" # 5G-1 R8000 +SET_5G1_GUEST_ACCESS_ENABLED_2 = "Set5G1GuestAccessEnabled2" # 5G-1 R8000 +SET_5G_GUEST_ACCESS_ENABLED_2 = "Set5GGuestAccessEnabled2" # 5G-2 R8000 +GET_5G_GUEST_ACCESS_ENABLED_2 = "Get5GGuestAccessEnabled2" # 5G-2 R8000 +GET_WPA_SECURITY_KEYS = "GetWPASecurityKeys" +GET_5G_WPA_SECURITY_KEYS = "Get5GWPASecurityKeys" +GET_5G_INFO = "Get5GInfo" +# SET_5G_WLAN_WPA_PSK_BY_PASSPHRASE = 'Set5GWLANWPAPSKByPassphrase' +GET_AVAILABLE_CHANNEL = "GetAvailableChannel" +# SET_GUEST_ACCESS_NETWORK = 'SetGuestAccessNetwork' +GET_GUEST_ACCESS_NETWORK_INFO = "GetGuestAccessNetworkInfo" +# SET_5G_GUEST_ACCESS_NETWORK = 'Set5GGuestAccessNetwork' +GET_5G_GUEST_ACCESS_NETWORK_INFO = "Get5GGuestAccessNetworkInfo" +GET_SMART_CONNECT_ENABLED = "IsSmartConnectEnabled" +SET_SMART_CONNECT_ENABLED = "SetSmartConnectEnable" + +# --------------------- +# SERVICE_USER_OPTIONS_TC +# --------------------- +SET_USER_OPTION_TC = "SetUserOptionsTC" + +# --------------------- +# FORMATTING +# --------------------- +SOAP_REQUEST = """ + + +{session_id} + +{body} + +""" + +LOGIN_V1_BODY = """ + + {username} + {password} + +""" + +CALL_BODY = """ + +{params} +""" diff --git a/pynetgear/helpers.py b/pynetgear/helpers.py new file mode 100644 index 0000000..d9d4cdb --- /dev/null +++ b/pynetgear/helpers.py @@ -0,0 +1,201 @@ +# encoding: utf-8 +"""Helper functions for pynetgear.""" +from io import StringIO +import xml.etree.ElementTree as ET +import re +import sys +import requests +import logging + +from .const import SERVICE_PREFIX + +_LOGGER = logging.getLogger(__name__) + +# define regex to filter invalid XML codes +# cf https://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python # noqa: E501 +if sys.version_info[0] == 3: + unichr = chr +_illegal_unichrs = [ + (0x00, 0x08), + (0x0B, 0x0C), + (0x0E, 0x1F), + (0x7F, 0x84), + (0x86, 0x9F), + (0xFDD0, 0xFDDF), + (0xFFFE, 0xFFFF), +] +if sys.maxunicode >= 0x10000: # not narrow build + _illegal_unichrs.extend( + [ + (0x1FFFE, 0x1FFFF), + (0x2FFFE, 0x2FFFF), + (0x3FFFE, 0x3FFFF), + (0x4FFFE, 0x4FFFF), + (0x5FFFE, 0x5FFFF), + (0x6FFFE, 0x6FFFF), + (0x7FFFE, 0x7FFFF), + (0x8FFFE, 0x8FFFF), + (0x9FFFE, 0x9FFFF), + (0xAFFFE, 0xAFFFF), + (0xBFFFE, 0xBFFFF), + (0xCFFFE, 0xCFFFF), + (0xDFFFE, 0xDFFFF), + (0xEFFFE, 0xEFFFF), + (0xFFFFE, 0xFFFFF), + (0x10FFFE, 0x10FFFF), + ] + ) + +_illegal_ranges = [ + "%s-%s" % (unichr(low), unichr(high)) for (low, high) in _illegal_unichrs +] + +_illegal_xml_chars_RE = re.compile("[%s]" % "".join(_illegal_ranges)) + + +def autodetect_url(): + """ + Try to autodetect the base URL of the router SOAP service. + + Returns None if it can't be found. + """ + DETECTABLE_URLS = [ + "http://routerlogin.net", + "http://routerlogin.net:5000", + "https://routerlogin.net", + ] + for url in DETECTABLE_URLS: + try: + resp = requests.get( + url + "/soap/server_sa/", + headers=get_soap_headers("Test:1", "test"), + verify=False, + timeout=1, + ) + if resp.status_code == 200: + return url + except requests.exceptions.RequestException: + pass + + return None + + +def find_node(text, xpath): + text = _illegal_xml_chars_RE.sub("", text) + + try: + it = ET.iterparse(StringIO(text)) + # strip all namespaces + for _, el in it: + if "}" in el.tag: + el.tag = el.tag.split("}", 1)[1] + except ET.ParseError: + _LOGGER.error("Error parsing XML response") + _LOGGER.debug("Error parsing XML response", exc_info=True) + _LOGGER.debug(text) + return False, None + + node = it.root.find(xpath) + if node is None: + _LOGGER.error("Error finding node in XML response") + _LOGGER.debug(text) + return False, None + + return True, node + + +def xml_get(e, name): + """ + Returns the value of the subnode "name" of element e. + + Returns None if the subnode doesn't exist + """ + r = e.find(name) + if r is not None: + return r.text + return None + + +def get_soap_headers(service, method): + action = SERVICE_PREFIX + service + "#" + method + return { + "SOAPAction": action, + "Cache-Control": "no-cache", + "User-Agent": "pynetgear", + "Content-Type": "multipart/form-data", + } + + +def is_valid_response(resp): + return resp.status_code == 200 and ( + "000000002001401" in resp.text) + + +def is_service_unavailable_response(resp): + return (resp.status_code == 503 or + "503" in resp.text) + + +def is_service_not_found_response(resp): + return (resp.status_code == 404 or + "404" in resp.text) + + +def convert(value, to_type, default=None): + """Convert value to to_type, returns default if fails.""" + try: + return default if value is None else to_type(value) + except ValueError: + # If value could not be converted + return default + + +def value_to_zero_or_one(s): + """Convert value to 1 or 0 string.""" + if isinstance(s, str): + if s.lower() in ("true", "t", "yes", "y", "1"): + return "1" + if s.lower() in ("false", "f", "no", "n", "0"): + return "0" + if isinstance(s, bool): + if s: + return "1" + return "0" + + raise ValueError("Cannot covert {} to a 1 or 0".format(s)) + + +def zero_or_one_to_boolean(s): + """Convert 1 or 0 string to boolean.""" + if isinstance(s, str): + if s == "1": + return True + if s == "0": + return False + if isinstance(s, bool): + return s + + raise ValueError("Cannot covert {} to a boolean".format(s)) + + +def zero_or_one_dict_to_boolean(d): + """Convert a dict of one key with a 1 or 0 string to boolean.""" + if isinstance(d, dict): + if len(d) == 1: + return zero_or_one_to_boolean(d.popitem()[1]) + + raise ValueError("Cannot covert {} to a boolean".format(d)) diff --git a/pynetgear/router.py b/pynetgear/router.py new file mode 100644 index 0000000..29eb134 --- /dev/null +++ b/pynetgear/router.py @@ -0,0 +1,1131 @@ +"""Module to communicate with Netgear routers using the SOAP v2 API.""" +from __future__ import print_function + +from collections import namedtuple +import logging +from datetime import timedelta +from time import sleep + +import requests +from urllib3 import disable_warnings +from urllib3.exceptions import InsecureRequestWarning + +from . import const as c +from . import helpers as h + +_LOGGER = logging.getLogger(__name__) + +disable_warnings(InsecureRequestWarning) + + +Device = namedtuple( + "Device", + [ + "name", + "ip", + "mac", + "type", + "signal", + "link_rate", + "allow_or_block", + "device_type", + "device_model", + "ssid", + "conn_ap_mac", + ], +) + + +class Netgear(object): + """Represents a session to a Netgear Router.""" + + def __init__( + self, + password=None, + host=None, + user=None, + port=None, + ssl=False, + url=None, + force_login_v1=False, + force_login_v2=False, + ): + """Initialize a Netgear session.""" + if not url and not host and not port: + url = h.autodetect_url() + + if not host: + host = c.DEFAULT_HOST + if not port: + port = c.DEFAULT_PORT + if not user: + user = c.DEFAULT_USER + + self.username = user + self.password = password + self.url = url + self.host = host + self.port = port + self.ssl = ssl + self.force_login_v1 = force_login_v1 + self.force_login_v2 = force_login_v2 + self.cookie = None + self.config_started = False + self._logging_in = False + self._login_version = 2 + + self._info = None + + self.guest_2g_methods = [ + c.GET_GUEST_ACCESS_ENABLED, + c.GET_GUEST_ACCESS_ENABLED_2, + ] + self.guest_5g_methods = [ + c.GET_5G1_GUEST_ACCESS_ENABLED, + c.GET_5G1_GUEST_ACCESS_ENABLED_2, + c.GET_5G_GUEST_ACCESS_ENABLED_2, + ] + self.guest_2g_set_methods = [ + c.SET_GUEST_ACCESS_ENABLED, + c.SET_GUEST_ACCESS_ENABLED_2, + ] + self.guest_5g_set_methods = [ + c.SET_5G_GUEST_ACCESS_ENABLED, + c.SET_5G1_GUEST_ACCESS_ENABLED_2, + c.SET_5G_GUEST_ACCESS_ENABLED_2, + ] + + @property + def soap_url(self): + """SOAP url to connect to the router.""" + if self.url: + return self.url + "/soap/server_sa/" + + scheme = "https" if self.ssl else "http" + return "{}://{}:{}/soap/server_sa/".format( + scheme, self.host, self.port) + + def _get_headers(self, service, method, need_auth=True): + headers = h.get_soap_headers(service, method) + # if the stored cookie is not a str then we are + # probably using the old login method + if need_auth and isinstance(self.cookie, str): + headers["Cookie"] = self.cookie + return headers + + def _post_request(self, headers, message): + """Post the API request to the router.""" + return requests.post( + self.soap_url, + headers=headers, + data=message, + timeout=30, + verify=False, + ) + + def _make_request( + self, + service, + method, + params=None, + body="", + need_auth=True, + check=True, + ): + """Make an API request to the router.""" + # If we have no cookie (v2) or never called login before (v1) + # and we need auth, the request will fail for sure. + if need_auth and not self.cookie: + if not self.login(): + return False, None + + headers = self._get_headers(service, method, need_auth) + + if not body: + if not params: + params = "" + if isinstance(params, dict): + _map = params + params = "" + for k in _map: + params += "<" + k + ">" + _map[k] + "\n" + + body = c.CALL_BODY.format( + service=c.SERVICE_PREFIX + service, + method=method, + params=params, + ) + + message = c.SOAP_REQUEST.format(session_id=c.SESSION_ID, body=body) + + try: + try: + response = self._post_request(headers, message) + except requests.exceptions.SSLError: + _LOGGER.debug( + "SSL error, thread as unauthorized response " + "and try again after re-login" + ) + response = requests.Response() + response.status_code = 401 + + if need_auth and h.is_unauthorized_response(response): + # let's discard the cookie because it probably expired (v2) + # or the IP-bound session expired (v1) + self.cookie = None + + _LOGGER.debug( + "Unauthorized response, " "let's login and retry..." + ) + if not self.login(): + _LOGGER.error("Unauthorized response, re-login failed") + return False, response + + # reset headers with new cookie first and re-try + headers = self._get_headers(service, method, need_auth) + response = self._post_request(headers, message) + + success = h.is_valid_response(response) + if not success: + if h.is_unauthorized_response(response): + err_mess = ( + "Unauthorized response, " + "after seemingly successful re-login" + ) + elif h.is_service_unavailable_response(response): + sleep(3) + # try the request one more time + response = self._post_request(headers, message) + success = h.is_valid_response(response) + if not success: + err_mess = ( + "503 Service Unavailable after retry, " + "the API may be overloaded." + ) + elif h.is_service_not_found_response(response): + err_mess = ( + "404 service '%s', method '%s' not found" + % (service, method) + ) + else: + err_mess = ( + "Invalid response: %s\n%s\n%s" + % (response.status_code, + str(response.headers), + response.text) + ) + if not success and not self._logging_in: + if check: + _LOGGER.error(err_mess) + elif not success: + _LOGGER.debug(err_mess) + + return success, response + + except requests.exceptions.RequestException: + if not self._logging_in: + _LOGGER.exception("Error talking to API") + else: + _LOGGER.debug("RequestException while logging in " + "port %s ssl %s", self.port, self.ssl) + self.cookie = None + + # Maybe one day we will distinguish between + # different errors.. + return False, None + + def config_start(self): + """ + Start a configuration session. + For managing router admin functionality (ie allowing/blocking devices) + """ + _LOGGER.debug("Config start") + + success, _ = self._make_request( + c.SERVICE_DEVICE_CONFIG, + c.CONFIGURATION_STARTED, + params={"NewSessionID": c.SESSION_ID}, + ) + + self.config_started = success + return success + + def config_finish(self): + """ + End of a configuration session. + Tells the router we're done managing admin functionality. + """ + _LOGGER.debug("Config finish") + if not self.config_started: + return True + + success, _ = self._make_request( + c.SERVICE_DEVICE_CONFIG, + c.CONFIGURATION_FINISHED, + params={"NewStatus": "ChangesApplied"}, + ) + + self.config_started = not success + return success + + def _get( + self, + service, + method, + parseNode=None, + parse_text=lambda text: text, + check=True, + ): + """Get information using a service and method from the router.""" + if parseNode is None: + parseNode = ".//%sResponse" % (method) + + _LOGGER.debug("Call %s", method) + success, response = self._make_request( + service, + method, + check=check, + ) + if not success: + _LOGGER.debug("Could not successfully get %s", method) + return None + + success, node = h.find_node(response.text, parseNode) + if not success: + _LOGGER.debug("Could not parse response for %s", method) + return None + + return {t.tag: parse_text(t.text) for t in node} + + def _set(self, service, method, params=None): + """Set router parameters using a service, method and params.""" + _LOGGER.debug("Call %s", method) + if self.config_started: + _LOGGER.error( + "Inconsistant configuration state, " + "configuration already started" + ) + if not self.config_finish(): + return False + + if not self.config_start(): + _LOGGER.error("Could not start configuration") + return False + + success, _ = self._make_request(service, method, params) + + if not success: + _LOGGER.error( + "Could not successfully call '%s' with params '%s'", + method, + params, + ) + return False + + if method == c.REBOOT: + self.config_started = False + return True + + if not self.config_finish(): + _LOGGER.error( + "Inconsistant configuration state, " + "configuration already finished" + ) + return False + + return True + + def _get_methods(self, service, method_list): + for idx in range(len(method_list)): + method = method_list[idx] + response = self._get( + service, + method, + check=False, + ) + if response is not None: + if idx != 0: # move to front for next time + method_list.insert(0, method_list.pop(idx)) + break + + return response + + def _set_methods( + self, service, method_list, params, get_function, expected + ): + for idx in range(len(method_list)): + method = method_list[idx] + response = self._set( + service, + method, + params, + ) + if not response: + continue + if get_function() == expected: + if idx != 0: # move to front for next time + method_list.insert(0, method_list.pop(idx)) + return True + + return False + + def login_try_port(self): + # first try the currently configured port-ssl combination + current_port = (self.port, self.ssl) + if self.login(): + return True + + ports = c.ALL_PORTS.copy() + if current_port in ports: + ports.remove(current_port) + + for port in ports: + self.port = port[0] + self.ssl = port[1] + if self.login(): + _LOGGER.info( + "Login succeeded using non default port " + "'%i' and ssl '%r'.", + self.port, + self.ssl, + ) + return True + + # reset original port-ssl + self.port = current_port[0] + self.ssl = current_port[1] + _LOGGER.error("login using all known port-ssl combinations failed.") + return False + + def login(self): + """ + Login to the router. + + Will be called automatically by other actions. + """ + if self._logging_in: + _LOGGER.debug("Login re-attempt within the login, ignoring.") + return False + self._logging_in = True + + # cookie is also used to track if at least + # one login attempt has been made for v1 + self.cookie = None + + # if a force option is given always start with that method + if self.force_login_v1: + self._login_version = 1 + if self.force_login_v2: + self._login_version = 2 + + login_methods = [self.login_v1, self.login_v2] + for idx in range(0, len(login_methods)): + login_version = (idx + self._login_version) % len(login_methods) + login_method = login_methods[login_version - 1] + if login_method(): + # login succeeded, next time start with this login method + self._logging_in = False + self._login_version = login_version + return True + + # login failed, next time start trying with the other login method + self._logging_in = False + self._login_version = self._login_version + 1 + return False + + def login_v2(self): + _LOGGER.debug("Login v2, port '%i', ssl, '%r'", self.port, self.ssl) + + success, response = self._make_request( + c.SERVICE_DEVICE_CONFIG, + c.LOGIN, + params={"Username": self.username, "Password": self.password}, + need_auth=False, + ) + + if not success: + return False + + if "Set-Cookie" in response.headers: + self.cookie = response.headers["Set-Cookie"] + else: + _LOGGER.error("Login v2 ok but no cookie...") + _LOGGER.debug(response.headers) + return False + + return True + + def login_v1(self): + _LOGGER.debug("Login v1, port '%i', ssl, '%r'", self.port, self.ssl) + + body = c.LOGIN_V1_BODY.format( + username=self.username, password=self.password + ) + + success, _ = self._make_request( + c.SERVICE_PARENTAL_CONTROL, c.LOGIN_OLD, body=body, need_auth=False + ) + + self.cookie = success + + # check login succes with info call + if self.get_info(use_cache=False) is None: + return False + + return True + + def get_info(self, use_cache=True): + """ + Return router informations, like: + - ModelName + - DeviceName + - SerialNumber + - Firmwareversion + - FirewallVersion + - Hardwareversion + - FirmwareLastUpdate + - FirmwareLastChecked + + Returns None if error occurred. + """ + if self._info is not None and use_cache: + _LOGGER.debug("Info from cache.") + return self._info + + response = self._get( + c.SERVICE_DEVICE_INFO, + "GetInfo", + ) + if response is None: + return None + + self._info = response + return self._info + + def get_attached_devices(self): + """ + Return list of connected devices to the router. + + Returns None if error occurred. + """ + _LOGGER.debug("Get attached devices") + + success, response = self._make_request( + c.SERVICE_DEVICE_INFO, c.GET_ATTACHED_DEVICES + ) + + if not success: + _LOGGER.error("Get attached devices failed") + return None + + success, node = h.find_node( + response.text, ".//GetAttachDeviceResponse/NewAttachDevice" + ) + if not success: + return None + + devices = [] + + # Netgear inserts a double-encoded value for "unknown" devices + decoded = node.text.strip().replace( + c.UNKNOWN_DEVICE_ENCODED, c.UNKNOWN_DEVICE_DECODED + ) + + if not decoded or decoded == "0": + _LOGGER.info("Can't parse attached devices string") + return devices + + entries = decoded.split("@") + + # First element is the total device count + entry_count = None + if len(entries) > 1: + entry_count = h.convert(entries.pop(0), int) + + if entry_count is not None and entry_count != len(entries): + _LOGGER.info( + "Number of devices should be: %d but is: %d", + entry_count, + len(entries), + ) + + for entry in entries: + info = entry.split(";") + + if len(info) == 0: + continue + + # Not all routers will report those + signal = None + link_type = None + link_rate = None + allow_or_block = None + mac = None + name = None + + if len(info) >= 8: + allow_or_block = info[7] + if len(info) >= 7: + link_type = info[4] + link_rate = h.convert(info[5], int) + signal = h.convert(info[6], int) + if len(info) >= 4: + mac = info[3] + if len(info) >= 3: + name = info[2] + + if len(info) < 2: + _LOGGER.warning("Unexpected entry: %s", info) + continue + + ipv4 = info[1] + + devices.append( + Device( + name, + ipv4, + mac, + link_type, + signal, + link_rate, + allow_or_block, + None, + None, + None, + None, + ) + ) + + return devices + + def get_attached_devices_2(self): + """ + Return list of connected devices to the router with details. + + This call is slower and probably heavier on the router load. + + Returns None if error occurred. + """ + _LOGGER.debug("Get attached devices 2") + + success, response = self._make_request( + c.SERVICE_DEVICE_INFO, c.GET_ATTACHED_DEVICES_2 + ) + if not success: + return None + + success, devices_node = h.find_node( + response.text, ".//GetAttachDevice2Response/NewAttachDevice" + ) + if not success: + return None + + xml_devices = devices_node.findall("Device") + devices = [] + for d in xml_devices: + ip = h.xml_get(d, "IP") + name = h.xml_get(d, "Name") + mac = h.xml_get(d, "MAC") + signal = h.convert(h.xml_get(d, "SignalStrength"), int) + link_type = h.xml_get(d, "ConnectionType") + link_rate = h.xml_get(d, "Linkspeed") + allow_or_block = h.xml_get(d, "AllowOrBlock") + device_type = h.convert(h.xml_get(d, "DeviceType"), int) + device_model = h.xml_get(d, "DeviceModel") + ssid = h.xml_get(d, "SSID") + conn_ap_mac = h.xml_get(d, "ConnAPMAC") + devices.append( + Device( + name, + ip, + mac, + link_type, + signal, + link_rate, + allow_or_block, + device_type, + device_model, + ssid, + conn_ap_mac, + ) + ) + + return devices + + def get_traffic_meter(self): + """ + Return dict of traffic meter stats, like: + - NewTodayConnectionTime + - NewTodayUpload + - NewTodayDownload + - NewYesterdayConnectionTime + - NewYesterdayUpload + - NewYesterdayDownload + - NewWeekConnectionTime + - NewWeekUpload + - NewWeekDownload + - NewMonthConnectionTime + - NewMonthUpload + - NewMonthDownload + - NewLastMonthConnectionTime + - NewLastMonthUpload + - NewLastMonthDownload + + Returns None if error occurred. + """ + + def parse_text(text): + """ + there are three kinds of values in the returned data + This function parses the different values and returns + (total, avg), timedelta or a plain float + """ + + def tofloats(lst): + return (float(t) for t in lst) + + try: + text = text.replace(",", "") # 25,350.10 MB + if "--" in text: + return None + if "/" in text: # "6.19/0.88" total/avg + return tuple(tofloats(text.split("/"))) + if ":" in text: # 11:14 hr:mn + hour, mins = tofloats(text.split(":")) + return timedelta(hours=hour, minutes=mins) + return float(text) + except ValueError: + _LOGGER.error("Error parsing traffic meter stats: %s", text) + return None + + return self._get( + c.SERVICE_DEVICE_CONFIG, + "GetTrafficMeterStatistics", + parse_text=parse_text, + ) + + def allow_block_device(self, mac_addr, device_status=c.BLOCK): + """ + Allow or Block a device via its Mac Address. + Pass in the mac address for the device that you want to set. + Pass in the device_status you wish to set the device to: + Allow (allow device to access the network) + or Block (block the device from accessing the network). + """ + return self._set( + c.SERVICE_DEVICE_CONFIG, + "SetBlockDeviceByMAC", + {"NewAllowOrBlock": device_status, "NewMACAddress": mac_addr}, + ) + + def reboot(self): + """Reboot the router""" + return self._set(c.SERVICE_DEVICE_CONFIG, c.REBOOT) + + def check_new_firmware(self): + """ + Check for new firmware and return dict like: + - CurrentVersion + - NewVersion + - ReleaseNote + """ + return self._get( + c.SERVICE_DEVICE_CONFIG, + c.CHECK_NEW_FIRMWARE, + ) + + def get_device_config_info(self): + """ + Get Device Config Info and return dict like: + - BlankState + - NewBlockSiteEnable + - NewBlockSiteName + - NewTimeZone + - NewDaylightSaving + """ + return self._get( + c.SERVICE_DEVICE_CONFIG, + "GetInfo", + ) + + def get_block_device_enable_status(self): + """Get Block Device Enable Status and return boolean.""" + response = self._get( + c.SERVICE_DEVICE_CONFIG, c.GET_BLOCK_DEVICE_ENABLE_STATUS + ) + return h.zero_or_one_dict_to_boolean(response) + + def set_block_device_enable(self, value=False): + """Set SetBlockDeviceEnable.""" + value = h.value_to_zero_or_one(value) + return self._set( + c.SERVICE_DEVICE_CONFIG, + c.SET_BLOCK_DEVICE_ENABLE, + {"NewBlockDeviceEnable": value}, + ) + + def get_traffic_meter_enabled(self): + """Get Traffic Meter Enabled and return boolean.""" + response = self._get( + c.SERVICE_DEVICE_CONFIG, c.GET_TRAFFIC_METER_ENABLED + ) + return h.zero_or_one_dict_to_boolean(response) + + def get_traffic_meter_options(self): + """ + Get Traffic Meter Options and return dict like: + - NewControlOption + - NewMonthlyLimit + - RestartHour + - RestartMinute + - RestartDay + """ + return self._get(c.SERVICE_DEVICE_CONFIG, c.GET_TRAFFIC_METER_OPTIONS) + + def enable_traffic_meter(self, value=False): + """Set EnableTrafficMeter.""" + value = h.value_to_zero_or_one(value) + return self._set( + c.SERVICE_DEVICE_CONFIG, + c.ENABLE_TRAFFIC_METER, + {"NewTrafficMeterEnable": value}, + ) + + def get_lan_config_sec_info(self): + """ + Get LAN Config Security Info and return dict like: + - NewLANSubnet + - NewWANLAN_Subnet_Match + - NewLANMACAddress + - NewLANIP + - NewDHCPEnabled + """ + return self._get( + c.SERVICE_LAN_CONFIG_SECURITY, + "GetInfo", + ) + + def get_wan_ip_con_info(self): + """ + Get WAN IP Connection Info and return dict like: + - NewEnable + - NewConnectionType + - NewExternalIPAddress + - NewSubnetMask + - NewAddressingType + - NewDefaultGateway + - NewMACAddress + - NewMACAddressOverride + - NewMaxMTUSize + - NewDNSEnabled + - NewDNSServers + """ + return self._get( + c.SERVICE_WAN_IP_CONNECTION, + "GetInfo", + ) + + def get_parental_control_enable_status(self): + """Get parental control enable status and return boolean.""" + response = self._get( + c.SERVICE_PARENTAL_CONTROL, c.GET_PARENTAL_CONTROL_ENABLE_STATUS + ) + return h.zero_or_one_dict_to_boolean(response) + + def enable_parental_control(self, value=False): + """Set EnableParentalControl.""" + value = h.value_to_zero_or_one(value) + return self._set( + c.SERVICE_PARENTAL_CONTROL, + c.ENABLE_PARENTAL_CONTROL, + {"NewEnable": value}, + ) + + def get_all_mac_addresses(self): + """ + Get All MAC Addresses for parental control and return dict like: + - AllMACAddresses + """ + return self._get( + c.SERVICE_PARENTAL_CONTROL, + c.GET_ALL_MAC_ADDRESSES, + ) + + def get_dns_masq_device_id(self): + """ + Get DNS Masq Device ID and return dict like: + - NewDeviceID + """ + return self._get( + c.SERVICE_PARENTAL_CONTROL, + c.GET_DNS_MASQ_DEVICE_ID, + ) + + def get_support_feature_list(self): + """ + Get Support Feature List and return dict like: + - DynamicQoS + - OpenDNSParentalControl + - MaxMonthlyTrafficLimitation + - AccessControl + - SpeedTest + - GuestNetworkSchedule + - TCAcceptance + - SmartConnect + - AttachedDevice + - NameNTGRDevice + - PasswordReset + """ + return self._get( + c.SERVICE_DEVICE_INFO, + c.GET_SUPPORT_FEATURE_LIST_XML, + parseNode=( + ".//%sResponse/newFeatureList/features" + % (c.GET_SUPPORT_FEATURE_LIST_XML) + ), + ) + + def set_speed_test_start(self): + """Start the speed test.""" + return self._set( + c.SERVICE_ADVANCED_QOS, + c.SET_SPEED_TEST_START, + ) + + def get_speed_test_result(self): + """ + Get the speed test result and return dict like: + - NewOOKLAUplinkBandwidth + - NewOOKLADownlinkBandwidth + - AveragePing + + Response Code = 1 means in progress + """ + _LOGGER.debug("Retrieving speed test result") + for _retry in range(1, 10): + success, response = self._make_request( + c.SERVICE_ADVANCED_QOS, + c.GET_SPEED_TEST_RESULT, + check=False, + ) + if response.status_code != 200: + _LOGGER.debug( + "Could not successfully get %s", c.GET_SPEED_TEST_RESULT + ) + return None + + success, node = h.find_node(response.text, ".//ResponseCode") + if not success: + _LOGGER.debug("Could not parse response for speed test result") + return None + + if node.text == "0": # new test done + break + if node.text == "1": # test in progress + sleep(2) + continue + if node.text == "501": # old test result + _LOGGER.warning("old speed test result returned") + break + _LOGGER.error( + "Unexpected return code for speed test: '%s'", node.text + ) + return None + + success, node = h.find_node( + response.text, ".//%sResponse" % (c.GET_SPEED_TEST_RESULT) + ) + if not success: + _LOGGER.debug("Could not parse response for speed test result") + return None + + return {t.tag: t.text for t in node} + + def get_new_speed_test_result(self): + """ + Request a new speed test and get the results and return dict like: + - NewOOKLAUplinkBandwidth + - NewOOKLADownlinkBandwidth + - AveragePing + + Response Code = 1 means in progress + """ + if not self.set_speed_test_start(): + return None + return self.get_speed_test_result() + + def get_qos_enable_status(self): + """ + Get QoS Enable Status and return dict like: + - NewQoSEnableStatus + """ + response = self._get( + c.SERVICE_ADVANCED_QOS, + c.GET_QOS_ENABLE_STATUS, + ) + return h.zero_or_one_dict_to_boolean(response) + + def set_qos_enable_status(self, value=False): + """Set QoS Enable Status.""" + value = h.value_to_zero_or_one(value) + return self._set( + c.SERVICE_ADVANCED_QOS, + c.SET_QOS_ENABLE_STATUS, + {"NewQoSEnable": value}, + ) + + def get_bandwidth_control_options(self): + """ + Get Bandwidth Control Options and return dict like: + - NewUplinkBandwidth + - NewDownlinkBandwidth + - NewSettingMethod + """ + return self._get( + c.SERVICE_ADVANCED_QOS, + c.GET_BANDWIDTH_CONTROL_OPTIONS, + ) + + def get_2g_guest_access_enabled(self): + """Get 2.4G Guest Access Enabled and return boolean.""" + response = self._get_methods( + c.SERVICE_WLAN_CONFIGURATION, + self.guest_2g_methods, + ) + return h.zero_or_one_dict_to_boolean(response) + + def get_5g_guest_access_enabled(self): + """Get 5G Guest Access Enabled and return boolean""" + response = self._get_methods( + c.SERVICE_WLAN_CONFIGURATION, + self.guest_5g_methods, + ) + return h.zero_or_one_dict_to_boolean(response) + + def set_2g_guest_access_enabled(self, value=False): + """Set Guest Access Enabled.""" + value = h.value_to_zero_or_one(value) + return self._set_methods( + c.SERVICE_WLAN_CONFIGURATION, + self.guest_2g_set_methods, + {"NewGuestAccessEnabled": value}, + self.get_2g_guest_access_enabled, + h.zero_or_one_to_boolean(value), + ) + + def set_5g_guest_access_enabled(self, value=False): + """Set 5G Guest Access Enabled.""" + value = h.value_to_zero_or_one(value) + return self._set_methods( + c.SERVICE_WLAN_CONFIGURATION, + self.guest_5g_set_methods, + {"NewGuestAccessEnabled": value}, + self.get_5g_guest_access_enabled, + h.zero_or_one_to_boolean(value), + ) + + def get_2g_wpa_security_keys(self): + """ + Get 2.4G WPA Security Keys and return dict like: + - NewWPAPassphrase + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, + c.GET_WPA_SECURITY_KEYS, + ) + + def get_5g_wpa_security_keys(self): + """ + Get 5G WPA Security Keys and return dict like: + - NewWPAPassphrase + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, + c.GET_5G_WPA_SECURITY_KEYS, + ) + + def get_5g_info(self): + """ + Get 5G Info and return dict like: + - NewEnable + - NewSSIDBroadcast + - NewStatus + - NewSSID + - NewRegion + - NewChannel + - NewWirelessMode + - NewBasicEncryptionModes + - NewWEPAuthType + - NewWPAEncryptionModes + - NewWLANMACAddress + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, + c.GET_5G_INFO, + ) + + def get_2g_info(self): + """ + Get 2G Info and return dict like: + - NewEnable + - NewSSIDBroadcast + - NewStatus + - NewSSID + - NewRegion + - NewChannel + - NewWirelessMode + - NewBasicEncryptionModes + - NewWEPAuthType + - NewWPAEncryptionModes + - NewWLANMACAddress + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, + "GetInfo", + ) + + def get_2g_guest_access_network_info(self): + """ + Get 2.4G Guest Access Network Info and return dict like: + - NewSSID + - NewSecurityMode + - NewKey + - UserSetSchedule + - Schedule + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, + c.GET_GUEST_ACCESS_NETWORK_INFO, + ) + + def get_5g_guest_access_network_info(self): + """ + Get 5G Guest Access Network Info and return dict like: + - NewSSID + - NewSecurityMode + - NewKey + - UserSetSchedule + - Schedule + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, c.GET_5G_GUEST_ACCESS_NETWORK_INFO + ) + + def get_smart_connect_enabled(self): + """ + Get Smart Connect Enabled and return dict like: + - NewSmartConnectEnable + """ + return self._get( + c.SERVICE_WLAN_CONFIGURATION, + c.GET_SMART_CONNECT_ENABLED, + ) + + def set_smart_connect_enabled(self, value=False): + """Set Smart Connect Enable.""" + value = h.value_to_zero_or_one(value) + return self._set( + c.SERVICE_WLAN_CONFIGURATION, + c.SET_SMART_CONNECT_ENABLED, + {"NewSmartConnectEnable": value}, + )