diff --git a/pynetgear/__init__.py b/pynetgear/__init__.py index 62f403e..ec3a982 100644 --- a/pynetgear/__init__.py +++ b/pynetgear/__init__.py @@ -1,76 +1,44 @@ +# encoding: utf-8 """Module to communicate with Netgear routers using the SOAP v2 API.""" from __future__ import print_function -from io import StringIO from collections import namedtuple import logging -import xml.etree.ElementTree as ET -from datetime import timedelta -import re -import sys - import requests +from . import const as c +from . import helpers as h -# define regex to filter invalid XML codes -# cf https://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python -if sys.version_info[0] == 3: - unichr = chr -_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), - (0x7F, 0x84), (0x86, 0x9F), - (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] -if sys.maxunicode >= 0x10000: # not narrow build - _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), - (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), - (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), - (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), - (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), - (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), - (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), - (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)]) - -_illegal_ranges = ["%s-%s" % (unichr(low), unichr(high)) - for (low, high) in _illegal_unichrs] -_illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(_illegal_ranges)) - - -DEFAULT_HOST = 'routerlogin.net' -DEFAULT_USER = 'admin' -DEFAULT_PORT = 5000 _LOGGER = logging.getLogger(__name__) - -BLOCK = "Block" -ALLOW = "Allow" - Device = namedtuple( "Device", ["name", "ip", "mac", "type", "signal", "link_rate", "allow_or_block", "device_type", "device_model", "ssid", "conn_ap_mac"]) -class Netgear(object): +class Netgear(): """Represents a session to a Netgear Router.""" - def __init__(self, password=None, host=None, user=None, port=None, + def __init__(self, password=None, host=None, user=None, port=None, # noqa ssl=False, url=None, force_login_v2=False): """Initialize a Netgear session.""" if not url and not host and not port: - url = autodetect_url() + url = h.autodetect_url() if url: self.soap_url = url + "/soap/server_sa/" else: if not host: - host = DEFAULT_HOST + host = c.DEFAULT_HOST if not port: - port = DEFAULT_PORT + port = c.DEFAULT_PORT scheme = "https" if ssl else "http" self.soap_url = "{}://{}:{}/soap/server_sa/".format(scheme, host, port) if not user: - user = DEFAULT_USER + user = c.DEFAULT_USER self.username = user self.password = password @@ -79,6 +47,9 @@ def __init__(self, password=None, host=None, user=None, port=None, self.cookie = None self.config_started = False + ########################################################################## + # HELPERS + ########################################################################## def login(self): """ Login to the router. @@ -92,13 +63,170 @@ def login(self): return self.login_v2() + def _get_headers(self, service, method, need_auth=True): + headers = h.get_soap_headers(service, method) + # if the stored cookie is not a str then we are + # probably using the old login method + if need_auth and isinstance(self.cookie, str): + headers["Cookie"] = self.cookie + return headers + + def _make_request(self, service, method, params=None, body="", # noqa + need_auth=True): + """Make an API request to the router.""" + # If we have no cookie (v2) or never called login before (v1) + # and we need auth, the request will fail for sure. + if need_auth and not self.cookie: + if not self.login(): + return False, None + + headers = self._get_headers(service, method, need_auth) + + if not body: + if not params: + params = "" + if isinstance(params, dict): + _map = params + params = "" + for k in _map: + params += "<" + k + ">" + _map[k] + "\n" + + body = c.CALL_BODY.format( + service=c.SERVICE_PREFIX + service, + method=method, params=params + ) + + message = c.SOAP_REQUEST.format(session_id=c.SESSION_ID, body=body) + + try: + response = requests.post(self.soap_url, headers=headers, + data=message, timeout=30, verify=False) + # assume we are good and abruptly exit to prevent errors + if method == c.REBOOT: + # print(response.text) + # exit(0) + return True, response + if need_auth and h.is_unauthorized_response(response): + # let's discard the cookie because it probably expired (v2) + # or the IP-bound (?) session expired (v1) + self.cookie = None + + _LOGGER.warning( + "Unauthorized response, let's login and retry..." + ) + if self.login(): + # reset headers with new cookie first + headers = self._get_headers(service, method, need_auth) + response = requests.post( + self.soap_url, headers=headers, + data=message, timeout=30, verify=False + ) + + success = h.is_valid_response(response) + + if not success: + _LOGGER.error("Invalid response") + _LOGGER.debug( + "%s\n%s\n%s", response.status_code, + str(response.headers), response.text + ) + + return success, response + + except requests.exceptions.SSLError: + _LOGGER.exception("SSL Error. Try --no-ssl") + + except requests.exceptions.HTTPError as e: + _LOGGER.exception('HTTP error: %s', e) + + except requests.exceptions.RequestException as e: + _LOGGER.exception('Connection error: %s', e) + # _LOGGER.exception("Error talking to API") + + # Maybe one day we will distinguish between + # different errors.. + + return False, None + + def _get(self, theLog, theService, theEndpoint, # noqa + parseNode, toParse, test=False): + _LOGGER.info(theLog) + success, response = self._make_request( + theService, + theEndpoint + ) + + if test: + print(response.text) + + if not success: + return None + + theInfo = h.to_get(parseNode, toParse, response) + + if not theInfo: + return None + + return theInfo + + def _set(self, theLog, theRequest, test): + + logStart = theLog[0] + logFail = theLog[1] + service = theRequest["service"] + method = theRequest["method"] + params = theRequest["params"] + body = theRequest["body"] + need_auth = theRequest["need_auth"] + + _LOGGER.info(logStart) + if self.config_started: + _LOGGER.error( + "Inconsistant configuration state, " + "configuration already started" + ) + return False + + if not self.config_start(): + _LOGGER.error("Could not start configuration") + return False + + success, response = self._make_request( + service, method, params, body, need_auth) + + if test: + print(response.text) + + if method == c.REBOOT: + # exit(0) + return True + + if not success: + _LOGGER.error(logFail) + return False + + if not self.config_finish(): + _LOGGER.error( + "Inconsistant configuration state, " + "configuration already finished" + ) + return False + + return True + + ########################################################################## + # SERVICE_DEVICE_CONFIG + ########################################################################## def login_v2(self): + """Attempt login.""" _LOGGER.debug("Login v2") self.cookie = None - success, response = self._make_request(SERVICE_DEVICE_CONFIG, "SOAPLogin", - {"Username": self.username, "Password": self.password}, - None, False) + success, response = self._make_request( + c.SERVICE_DEVICE_CONFIG, c.LOGIN, + {"Username": self.username, "Password": self.password}, + None, False + ) if not success: return None @@ -111,20 +239,444 @@ def login_v2(self): return self.cookie + # def logout(self): + + def reboot(self, value='0', test=False): + """Reboot Router.""" + theLog = {} + theLog[0] = "Rebooting Router" + theLog[1] = "Could not successfully reboot router" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_DEVICE_CONFIG, + "method": c.REBOOT, + "params": {"NewRebootEnable": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def check_new_firmware(self, test=False): + """Parse CheckNewFirmware and return dict.""" + theLog = "Check for new firmware" + parseNode = f".//{c.CHECK_NEW_FIRMWARE}Response" + toParse = [ + 'CurrentVersion', + 'NewVersion', + 'ReleaseNote' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.CHECK_NEW_FIRMWARE, parseNode, toParse, test + ) + + return theInfo + + # def update_new_firmware(self): + + # **NEW** + # Response is GetInfo + def check_app_new_firmware(self, test=False): + """Parse CheckAppNewFirmware and return dict.""" + theLog = "Check for new firmware" + parseNode = f".//{c.GET_DEVICE_CONFIG_INFO}Response" + toParse = [ + 'BlankState', + 'NewBlockSiteEnable', + 'NewBlockSiteName', + 'NewTimeZone', + 'NewDaylightSaving' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.CHECK_APP_NEW_FIRMWARE, parseNode, toParse, test + ) + + return theInfo + + def config_start(self): + """ + Start a configuration session. + + For managing router admin functionality (ie allowing/blocking devices) + """ + _LOGGER.info("Config start") + + success, _ = self._make_request( + c.SERVICE_DEVICE_CONFIG, c.CONFIGURATION_STARTED, + {"NewSessionID": c.SESSION_ID} + ) + + self.config_started = success + return success + + def config_finish(self): + """ + End of a configuration session. + + Tells the router we're done managing admin functionality. + """ + _LOGGER.info("Config finish") + if not self.config_started: + return True + + success, _ = self._make_request( + c.SERVICE_DEVICE_CONFIG, c.CONFIGURATION_FINISHED, + {"NewStatus": "ChangesApplied"} + ) + + self.config_started = not success + return success + + # **NEW** + def get_device_config_info(self, test=False): + """Parse Device Config GetInfo and return dict.""" + theLog = "Get DeviceConfig Info" + parseNode = f".//{c.GET_DEVICE_CONFIG_INFO}Response" + toParse = [ + 'BlankState', + 'NewBlockSiteEnable', + 'NewBlockSiteName', + 'NewTimeZone', + 'NewDaylightSaving' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.GET_DEVICE_CONFIG_INFO, parseNode, toParse, test + ) + + return theInfo + + def get_block_device_enable_status(self, test=False): + """Parse GetBlockDeviceEnableStatus and return dict.""" + theLog = "Get Block Device Enable Status" + parseNode = f".//{c.GET_BLOCK_DEVICE_ENABLE_STATUS}Response" + toParse = ['NewBlockDeviceEnable'] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.GET_BLOCK_DEVICE_ENABLE_STATUS, parseNode, toParse, test + ) + + return theInfo + + def set_block_device_enable(self, value='0', test=False): + """Set SetBlockDeviceEnable.""" + theLog = {} + theLog[0] = "Setting Block Device Enabled" + theLog[1] = "Could not successfully set block device" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_DEVICE_CONFIG, + "method": c.SET_BLOCK_DEVICE_ENABLE, + "params": {"NewBlockDeviceEnable": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + # def enable_block_device_for_all(self): + + def set_block_device_by_mac(self, test=False, mac_addr=None, + device_status=c.BLOCK): + """ + Allow or Block a device via its Mac Address. + + Pass in the mac address for the device that you want to set. + Pass in the device_status you wish to set the device to: Allow + (allow device to access the network) or Block (block the device + from accessing the network). + """ + theLog = {} + theLog[0] = "Allow block device" + theLog[1] = "Could not successfully call allow/block device" + theRequest = { + "service": c.SERVICE_DEVICE_CONFIG, + "method": c.SET_BLOCK_DEVICE_BY_MAC, + "params": { + "NewAllowOrBlock": device_status, + "NewMACAddress": mac_addr}, + "body": "", + "need_auth": True + } + + theResponse = False + if mac_addr: + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def get_traffic_meter_enabled(self, test=False): + """Parse GetTrafficMeterEnabled and return dict.""" + theLog = "Get Traffic Meter Enabled" + parseNode = f".//{c.GET_TRAFFIC_METER_ENABLED}Response" + toParse = [ + 'NewTrafficMeterEnable' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.GET_TRAFFIC_METER_ENABLED, parseNode, toParse, test + ) + + return theInfo + + def get_traffic_meter_options(self, test=False): + """Parse GetTrafficMeterOptions and return dict.""" + theLog = "Get Traffic Meter Options" + parseNode = f".//{c.GET_TRAFFIC_METER_OPTIONS}Response" + toParse = [ + 'NewControlOption', + 'NewMonthlyLimit', + 'RestartHour', + 'RestartMinute', + 'RestartDay' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.GET_TRAFFIC_METER_OPTIONS, parseNode, toParse, test + ) + + return theInfo + + def get_traffic_meter_statistics(self, test=False): + """Parse GetTrafficMeterStatistics and return dict.""" + theLog = "Get Traffic Meter Statistics" + parseNode = f".//{c.GET_TRAFFIC_METER_STATISTICS}Response" + toParse = [ + 'NewTodayConnectionTime', + 'NewTodayUpload', + 'NewTodayDownload', + 'NewYesterdayConnectionTime', + 'NewYesterdayUpload', + 'NewYesterdayDownload', + 'NewWeekConnectionTime', + 'NewWeekUpload', + 'NewWeekDownload', + 'NewMonthConnectionTime', + 'NewMonthUpload', + 'NewMonthDownload', + 'NewLastMonthConnectionTime', + 'NewLastMonthUpload', + 'NewLastMonthDownload' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_CONFIG, + c.GET_TRAFFIC_METER_STATISTICS, parseNode, toParse, test + ) + + return {t: h.parse_text(value) for t, value in theInfo.items()} + + def enable_traffic_meter(self, value='0', test=False): + """Set EnableTrafficMeter.""" + theLog = {} + theLog[0] = "Enabling Traffic Meter" + theLog[1] = "Could not successfully enable traffic meter" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_DEVICE_CONFIG, + "method": c.ENABLE_TRAFFIC_METER, + "params": {"NewTrafficMeterEnable": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + # def set_traffic_meter_options(self): + + ########################################################################## + # SERVICE_LAN_CONFIG_SECURITY + ########################################################################## + # **NEW** + def get_lan_config_sec_info(self, test=False): + """Parse LANConfigSecurity Info and return dict.""" + theLog = "Get LANConfigSecurity Info" + parseNode = f".//{c.GET_LAN_CONFIG_SEC_INFO}Response" + toParse = [ + 'NewLANSubnet', + 'NewWANLAN_Subnet_Match', + 'NewLANMACAddress', + 'NewLANIP', + 'NewDHCPEnabled' + ] + + theInfo = self._get( + theLog, c.SERVICE_LAN_CONFIG_SECURITY, + c.GET_LAN_CONFIG_SEC_INFO, parseNode, toParse, test + ) + + return theInfo + + ########################################################################## + # SERVICE_WAN_IP_CONNECTION + ########################################################################## + # **NEW** + def get_wan_ip_con_info(self, test=False): + """Parse WANIPConnection Info and return dict.""" + theLog = "Get WANIPConnection Info" + parseNode = f".//{c.GET_WAN_IP_CON_INFO}Response" + toParse = [ + 'NewEnable', + 'NewConnectionType', + 'NewExternalIPAddress', + 'NewSubnetMask', + 'NewAddressingType', + 'NewDefaultGateway', + 'NewMACAddress', + 'NewMACAddressOverride', + 'NewMaxMTUSize', + 'NewDNSEnabled', + 'NewDNSServers' + ] + + theInfo = self._get( + theLog, c.SERVICE_WAN_IP_CONNECTION, + c.GET_WAN_IP_CON_INFO, parseNode, toParse, test + ) + + return theInfo + + ########################################################################## + # SERVICE_PARENTAL_CONTROL + ########################################################################## def login_v1(self): + """Attempt login.""" _LOGGER.debug("Login v1") - body = LOGIN_V1_BODY.format(username=self.username, - password=self.password) + body = c.LOGIN_V1_BODY.format( + username=self.username, password=self.password + ) - success, _ = self._make_request("ParentalControl:1", "Authenticate", - None, body, False) + success, _ = self._make_request( + c.SERVICE_PARENTAL_CONTROL, c.LOGIN_OLD, None, body, False + ) self.cookie = success return success - def get_attached_devices(self): + def get_parental_control_enable_status(self, test=False): + """Parse GetEnableStatus and return dict.""" + theLog = "Get Parent Control Enable Status" + parseNode = f".//{c.GET_PARENTAL_CONTROL_ENABLE_STATUS}Response" + toParse = ['ParentalControl'] + + theInfo = self._get( + theLog, c.SERVICE_PARENTAL_CONTROL, + c.GET_PARENTAL_CONTROL_ENABLE_STATUS, parseNode, toParse, test + ) + + return theInfo + + def enable_parental_control(self, value='0', test=False): + """Set EnableParentalControl.""" + theLog = {} + theLog[0] = "Enabling Parental Control" + theLog[1] = "Could not successfully enable parental control" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_PARENTAL_CONTROL, + "method": c.ENABLE_PARENTAL_CONTROL, + "params": {"NewEnable": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def get_all_mac_addresses(self, test=False): + """Parse GetAllMACAddresses and return dict.""" + theLog = "Get All MAC Addresses" + parseNode = f".//{c.GET_ALL_MAC_ADDRESSES}Response" + toParse = [ + 'AllMACAddresses' + ] + + theInfo = self._get( + theLog, c.SERVICE_PARENTAL_CONTROL, c.GET_ALL_MAC_ADDRESSES, + parseNode, toParse, test + ) + + return theInfo + + def get_dns_masq_device_id(self, test=False): + """Parse GetDNSMasqDeviceID and return dict.""" + theLog = "Get DNS Masq Device ID" + parseNode = f".//{c.GET_DNS_MASQ_DEVICE_ID}Response" + toParse = [ + 'NewDeviceID' + ] + + theInfo = self._get( + theLog, c.SERVICE_PARENTAL_CONTROL, c.GET_DNS_MASQ_DEVICE_ID, + parseNode, toParse, test + ) + + return theInfo + + # def set_dns_masq_device_id(self): + # def delete_mac_address(self): + + ########################################################################## + # SERVICE_DEVICE_INFO + ########################################################################## + def get_info(self, test=False): + """Parse GetInfo and return dict.""" + theLog = "Get Info" + parseNode = f".//{c.GET_INFO}Response" + toParse = [ + 'ModelName', 'Description', 'SerialNumber', 'Firmwareversion', + 'SmartAgentversion', 'FirewallVersion', 'VPNVersion', + 'OthersoftwareVersion', 'Hardwareversion', 'Otherhardwareversion', + 'FirstUseDate', 'DeviceName', 'FirmwareDLmethod', + 'FirmwareLastUpdate', 'FirmwareLastChecked', 'DeviceMode' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_INFO, c.GET_INFO, parseNode, toParse, test + ) + + return theInfo + + def get_support_feature_list_XML(self, test=False): + """Parse getSupportFeatureListXML and return dict.""" + theLog = "Get Support Feature List" + parseNode = ( + f".//{c.GET_SUPPORT_FEATURE_LIST_XML}" + "Response/newFeatureList/features" + ) + + toParse = [ + 'DynamicQoS', 'OpenDNSParentalControl', + 'MaxMonthlyTrafficLimitation', 'AccessControl', 'SpeedTest', + 'GuestNetworkSchedule', 'TCAcceptance', 'SmartConnect', + 'AttachedDevice', 'NameNTGRDevice', 'PasswordReset' + ] + + theInfo = self._get( + theLog, c.SERVICE_DEVICE_INFO, c.GET_SUPPORT_FEATURE_LIST_XML, + parseNode, toParse, test + ) + + return theInfo + + def get_attached_devices(self, test=False): # noqa """ Return list of connected devices to the router. @@ -132,24 +684,24 @@ def get_attached_devices(self): """ _LOGGER.info("Get attached devices") - success, response = self._make_request(SERVICE_DEVICE_INFO, - "GetAttachDevice") + success, response = self._make_request(c.SERVICE_DEVICE_INFO, + c.GET_ATTACHED_DEVICES) if not success: _LOGGER.error("Get attached devices failed") return None - success, node = _find_node( + success, node = h.find_node( response.text, - ".//GetAttachDeviceResponse/NewAttachDevice") + f".//{c.GET_ATTACHED_DEVICES}Response/NewAttachDevice") if not success: return None devices = [] # Netgear inserts a double-encoded value for "unknown" devices - decoded = node.text.strip().replace(UNKNOWN_DEVICE_ENCODED, - UNKNOWN_DEVICE_DECODED) + decoded = node.text.strip().replace(c.UNKNOWN_DEVICE_ENCODED, + c.UNKNOWN_DEVICE_DECODED) if not decoded or decoded == "0": _LOGGER.error("Can't parse attached devices string") @@ -161,7 +713,7 @@ def get_attached_devices(self): # First element is the total device count entry_count = None if len(entries) > 1: - entry_count = _convert(entries.pop(0), int) + entry_count = h.convert(entries.pop(0), int) if entry_count is not None and entry_count != len(entries): _LOGGER.info( @@ -171,7 +723,7 @@ def get_attached_devices(self): for entry in entries: info = entry.split(";") - if len(info) == 0: + if not info: continue # Not all routers will report those @@ -184,8 +736,8 @@ def get_attached_devices(self): allow_or_block = info[7] if len(info) >= 7: link_type = info[4] - link_rate = _convert(info[5], int) - signal = _convert(info[6], int) + link_rate = h.convert(info[5], int) + signal = h.convert(info[6], int) if len(info) < 4: _LOGGER.warning("Unexpected entry: %s", info) @@ -199,7 +751,7 @@ def get_attached_devices(self): return devices - def get_attached_devices_2(self): + def get_attached_devices_2(self, test=False): # noqa """ Return list of connected devices to the router with details. @@ -209,309 +761,499 @@ def get_attached_devices_2(self): """ _LOGGER.info("Get attached devices 2") - success, response = self._make_request(SERVICE_DEVICE_INFO, - "GetAttachDevice2") + success, response = self._make_request(c.SERVICE_DEVICE_INFO, + c.GET_ATTACHED_DEVICES_2) if not success: return None - success, devices_node = _find_node( + success, devices_node = h.find_node( response.text, - ".//GetAttachDevice2Response/NewAttachDevice") + f".//{c.GET_ATTACHED_DEVICES_2}Response/NewAttachDevice") if not success: return None xml_devices = devices_node.findall("Device") devices = [] for d in xml_devices: - ip = _xml_get(d, 'IP') - name = _xml_get(d, 'Name') - mac = _xml_get(d, 'MAC') - signal = _convert(_xml_get(d, 'SignalStrength'), int) - link_type = _xml_get(d, 'ConnectionType') - link_rate = _xml_get(d, 'Linkspeed') - allow_or_block = _xml_get(d, 'AllowOrBlock') - device_type = _convert(_xml_get(d, 'DeviceType'), int) - device_model = _xml_get(d, 'DeviceModel') - ssid = _xml_get(d, 'SSID') - conn_ap_mac = _xml_get(d, 'ConnAPMAC') + ip = h.xml_get(d, 'IP') + name = h.xml_get(d, 'Name') + mac = h.xml_get(d, 'MAC') + signal = h.convert(h.xml_get(d, 'SignalStrength'), int) + link_type = h.xml_get(d, 'ConnectionType') + link_rate = h.xml_get(d, 'Linkspeed') + allow_or_block = h.xml_get(d, 'AllowOrBlock') + device_type = h.convert(h.xml_get(d, 'DeviceType'), int) + device_model = h.xml_get(d, 'DeviceModel') + ssid = h.xml_get(d, 'SSID') + conn_ap_mac = h.xml_get(d, 'ConnAPMAC') devices.append(Device(name, ip, mac, link_type, signal, link_rate, allow_or_block, device_type, device_model, ssid, conn_ap_mac)) return devices - def get_traffic_meter(self): - """ - Return dict of traffic meter stats. - - Returns None if error occurred. - """ - _LOGGER.info("Get traffic meter") - - def parse_text(text): - """ - there are three kinds of values in the returned data - This function parses the different values and returns - (total, avg), timedelta or a plain float - """ - def tofloats(lst): return (float(t) for t in lst) - try: - if "/" in text: # "6.19/0.88" total/avg - return tuple(tofloats(text.split('/'))) - elif ":" in text: # 11:14 hr:mn - hour, mins = tofloats(text.split(':')) - return timedelta(hours=hour, minutes=mins) - else: - return float(text) - except ValueError: - return None - - success, response = self._make_request(SERVICE_DEVICE_CONFIG, - "GetTrafficMeterStatistics") - if not success: - return None - - success, node = _find_node( - response.text, - ".//GetTrafficMeterStatisticsResponse") - if not success: - return None - - return {t.tag: parse_text(t.text) for t in node} - - def config_start(self): - """ - Start a configuration session. - For managing router admin functionality (ie allowing/blocking devices) - """ - _LOGGER.info("Config start") - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, "ConfigurationStarted", {"NewSessionID": SESSION_ID}) - - self.config_started = success - return success - - def config_finish(self): - """ - End of a configuration session. - Tells the router we're done managing admin functionality. - """ - _LOGGER.info("Config finish") - if not self.config_started: - return True - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, "ConfigurationFinished", {"NewStatus": "ChangesApplied"}) - - self.config_started = not success - return success - - def allow_block_device(self, mac_addr, device_status=BLOCK): - """ - Allow or Block a device via its Mac Address. - Pass in the mac address for the device that you want to set. Pass in the - device_status you wish to set the device to: Allow (allow device to access the - network) or Block (block the device from accessing the network). - """ - _LOGGER.info("Allow block device") - if self.config_started: - _LOGGER.error("Inconsistant configuration state, configuration already started") - return False - - if not self.config_start(): - _LOGGER.error("Could not start configuration") - return False - - success, _ = self._make_request( - SERVICE_DEVICE_CONFIG, "SetBlockDeviceByMAC", - {"NewAllowOrBlock": device_status, "NewMACAddress": mac_addr}) - - if not success: - _LOGGER.error("Could not successfully call allow/block device") - return False - - if not self.config_finish(): - _LOGGER.error("Inconsistant configuration state, configuration already finished") - return False - - return True - - def _get_headers(self, service, method, need_auth=True): - headers = _get_soap_headers(service, method) - # if the stored cookie is not a str then we are - # probably using the old login method - if need_auth and isinstance(self.cookie, str): - headers["Cookie"] = self.cookie - return headers - - def _make_request(self, service, method, params=None, body="", - need_auth=True): - """Make an API request to the router.""" - # If we have no cookie (v2) or never called login before (v1) - # and we need auth, the request will fail for sure. - if need_auth and not self.cookie: - if not self.login(): - return False, None - - headers = self._get_headers(service, method, need_auth) - - if not body: - if not params: - params = "" - if isinstance(params, dict): - _map = params - params = "" - for k in _map: - params += "<" + k + ">" + _map[k] + "\n" - - body = CALL_BODY.format(service=SERVICE_PREFIX + service, - method=method, params=params) - - message = SOAP_REQUEST.format(session_id=SESSION_ID, body=body) - - try: - response = requests.post(self.soap_url, headers=headers, - data=message, timeout=30, verify=False) - - if need_auth and _is_unauthorized_response(response): - # let's discard the cookie because it probably expired (v2) - # or the IP-bound (?) session expired (v1) - self.cookie = None - - _LOGGER.warning("Unauthorized response, let's login and retry...") - if self.login(): - # reset headers with new cookie first - headers = self._get_headers(service, method, need_auth) - response = requests.post(self.soap_url, headers=headers, - data=message, timeout=30, verify=False) - - success = _is_valid_response(response) - - if not success: - _LOGGER.error("Invalid response") - _LOGGER.debug("%s\n%s\n%s", response.status_code, str(response.headers), response.text) - - return success, response - - except requests.exceptions.RequestException: - _LOGGER.exception("Error talking to API") - - # Maybe one day we will distinguish between - # different errors.. - return False, None - - -def autodetect_url(): - """ - Try to autodetect the base URL of the router SOAP service. - - Returns None if it can't be found. - """ - for url in ["http://routerlogin.net:5000", "https://routerlogin.net", - "http://routerlogin.net"]: - try: - r = requests.get(url + "/soap/server_sa/", - headers=_get_soap_headers("Test:1", "test"), - verify=False) - if r.status_code == 200: - return url - except requests.exceptions.RequestException: - pass - - return None - - -def _find_node(text, xpath): - text = _illegal_xml_chars_RE.sub('', text) - it = ET.iterparse(StringIO(text)) - # strip all namespaces - for _, el in it: - if '}' in el.tag: - el.tag = el.tag.split('}', 1)[1] - node = it.root.find(xpath) - if node is None: - _LOGGER.error("Error finding node in XML response") - _LOGGER.debug(text) - return False, None - - return True, node - - -def _xml_get(e, name): - """ - Returns the value of the subnode "name" of element e. - - Returns None if the subnode doesn't exist - """ - r = e.find(name) - if r is not None: - return r.text - return None - - -def _get_soap_headers(service, method): - action = SERVICE_PREFIX + service + "#" + method - return { - "SOAPAction": action, - "Cache-Control": "no-cache", - "User-Agent": "pynetgear", - "Content-Type": "multipart/form-data" - } - - -def _is_valid_response(resp): - return (resp.status_code == 200 and - ("0000" in resp.text or - "000" in resp.text)) - - -def _is_unauthorized_response(resp): - return (resp.status_code == 401 or - "401" in resp.text) - - -def _convert(value, to_type, default=None): - """Convert value to to_type, returns default if fails.""" - try: - return default if value is None else to_type(value) - except ValueError: - # If value could not be converted - return default - - -SERVICE_PREFIX = "urn:NETGEAR-ROUTER:service:" -SERVICE_DEVICE_INFO = "DeviceInfo:1" -SERVICE_DEVICE_CONFIG = "DeviceConfig:1" - -REGEX_ATTACHED_DEVICES = r"(.*)" - -# Until we know how to generate it, give the one we captured -SESSION_ID = "A7D88AE69687E58D9A00" - -SOAP_REQUEST = """ - - -{session_id} - -{body} - -""" - -LOGIN_V1_BODY = """ - - {username} - {password} - -""" - -CALL_BODY = """ - -{params} -""" - -UNKNOWN_DEVICE_DECODED = '' -UNKNOWN_DEVICE_ENCODED = '<unknown>' + # def set_device_name_icon_by_mac(self): + # def set_device_name(self): + + ########################################################################## + # SERVICE_ADVANCED_QOS + ########################################################################## + def set_speed_test_start(self, test=False): + """Start the speed test.""" + theLog = {} + theLog[0] = "Starting a speed test" + theLog[1] = "Could not successfully start speed test" + theRequest = { + "service": c.SERVICE_ADVANCED_QOS, + "method": c.SET_SPEED_TEST_START, + "params": None, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def get_speed_test_result(self, test=False): + """Get the speed test result and return dict.""" + # Response Code = 1 means in progress + theLog = "Get Speed Test Result" + parseNode = f".//{c.GET_SPEED_TEST_RESULT}Response" + toParse = [ + 'NewOOKLAUplinkBandwidth', + 'NewOOKLADownlinkBandwidth', + 'AveragePing' + ] + + theInfo = self._get( + theLog, c.SERVICE_ADVANCED_QOS, c.GET_SPEED_TEST_RESULT, + parseNode, toParse, test + ) + + return theInfo + + def get_qos_enable_status(self, test=False): + """Parse getQoSEnableStatus and return dict.""" + theLog = "Get QOS Enable Status" + parseNode = f".//{c.GET_QOS_ENABLE_STATUS}Response" + toParse = [ + 'NewQoSEnableStatus' + ] + + theInfo = self._get( + theLog, c.SERVICE_ADVANCED_QOS, c.GET_QOS_ENABLE_STATUS, + parseNode, toParse, test + ) + + return theInfo + + def set_qos_enable_status(self, value='0', test=False): + """Set SetQoSEnableStatus.""" + theLog = {} + theLog[0] = "Setting Guest Access Enabled" + theLog[1] = "Could not successfully set guest access" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_ADVANCED_QOS, + "method": c.SET_QOS_ENABLE_STATUS, + "params": {"NewQoSEnable": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def get_bandwidth_control_options(self, test=False): + """Parse GetBandwidthControlOptions and return dict.""" + theLog = "Get Bandwidth Control Options" + parseNode = f".//{c.GET_BANDWIDTH_CONTROL_OPTIONS}Response" + toParse = [ + 'NewUplinkBandwidth', 'NewDownlinkBandwidth', 'NewSettingMethod' + ] + + theInfo = self._get( + theLog, c.SERVICE_ADVANCED_QOS, c.GET_BANDWIDTH_CONTROL_OPTIONS, + parseNode, toParse, test + ) + + return theInfo + + # def set_bandwidth_control_options(self): + + # Does Not Work + def get_current_app_bandwidth(self, test=False): + """Parse GetCurrentAppBandwidth and return dict.""" + theLog = "Get Current App Bandwidth" + parseNode = f".//{c.GET_CURRENT_APP_BANDWIDTH}Response" + toParse = [] + + theInfo = self._get( + theLog, c.SERVICE_ADVANCED_QOS, c.GET_CURRENT_APP_BANDWIDTH, + parseNode, toParse, test + ) + + return theInfo + + # Does Not Work + def get_current_device_bandwidth(self, test=False): + """Parse GetCurrentDeviceBandwidth and return dict.""" + theLog = "Get Current Device Bandwidth" + parseNode = f".//{c.GET_CURRENT_DEVICE_BANDWIDTH}Response" + toParse = [] + + theInfo = self._get( + theLog, c.SERVICE_ADVANCED_QOS, c.GET_CURRENT_DEVICE_BANDWIDTH, + parseNode, toParse, test + ) + + return theInfo + + # Does Not Work + def get_current_app_bandwidth_by_mac(self, test=False): + """Parse GetCurrentAppBandwidthByMAC and return dict.""" + theLog = "Get Current Device Bandwidth by MAC" + parseNode = f".//{c.GET_CURRENT_APP_BANDWIDTH_BY_MAC}Response" + toParse = [] + + theInfo = self._get( + theLog, c.SERVICE_ADVANCED_QOS, c.GET_CURRENT_APP_BANDWIDTH_BY_MAC, + parseNode, toParse, test + ) + + return theInfo + + ########################################################################## + # SERVICE_WLAN_CONFIGURATION + ########################################################################## + def get_guest_access_enabled(self, test=False): + """Parse GetGuestAccessEnabled and return dict.""" + theLog = "Get Guest Access Enabled" + parseNode = f".//{c.GET_GUEST_ACCESS_ENABLED}Response" + toParse = ['NewGuestAccessEnabled'] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, c.GET_GUEST_ACCESS_ENABLED, + parseNode, toParse, test + ) + + if not theInfo: + + # Parse GetGuestAccessEnabled2 and return dict. + theLog = "Get Guest Access Enabled" + parseNode = f".//{c.GET_GUEST_ACCESS_ENABLED_2}Response" + toParse = ['NewGuestAccessEnabled'] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_GUEST_ACCESS_ENABLED_2, parseNode, toParse, test + ) + + return theInfo + + def get_5g_guest_access_enabled(self, test=False): + """Parse Get5GGuestAccessEnabled and return dict.""" + theLog = "Get 5G Guest Access Enabled" + parseNode = f".//{c.GET_5G1_GUEST_ACCESS_ENABLED}Response" + toParse = ['NewGuestAccessEnabled'] + + _LOGGER.debug("Trying %s", c.GET_5G1_GUEST_ACCESS_ENABLED) + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_5G1_GUEST_ACCESS_ENABLED, parseNode, toParse, test + ) + + if not theInfo: + + # Parse Get5G1GuestAccessEnabled and return dict. + theLog = "Get 5G1 Guest Access Enabled 2" + parseNode = f".//{c.GET_5G1_GUEST_ACCESS_ENABLED_2}Response" + toParse = ['NewGuestAccessEnabled'] + + _LOGGER.debug("Trying %s", c.GET_5G1_GUEST_ACCESS_ENABLED_2) + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_5G1_GUEST_ACCESS_ENABLED_2, parseNode, toParse, test + ) + + if not theInfo: + + # Parse Get5GGuestAccessEnabled2 and return dict. + theLog = "Get 5G Guest Access Enabled 2" + parseNode = f".//{c.GET_5G_GUEST_ACCESS_ENABLED_2}Response" + toParse = ['NewGuestAccessEnabled'] + + _LOGGER.debug("Trying %s", c.GET_5G_GUEST_ACCESS_ENABLED_2) + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_5G_GUEST_ACCESS_ENABLED_2, parseNode, toParse, test + ) + + return theInfo + + def set_guest_access_enabled(self, value='0', test=False): + """Set SetGuestAccessEnabled.""" + theLog = {} + theLog[0] = "Setting Guest Access Enabled" + theLog[1] = "Could not successfully set guest access" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_WLAN_CONFIGURATION, + "method": c.SET_GUEST_ACCESS_ENABLED, + "params": {"NewGuestAccessEnabled": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def set_guest_access_enabled_2(self, value='0', test=False): + """Set SetGuestAccessEnabled2.""" + theLog = {} + theLog[0] = "Setting Guest Access Enabled" + theLog[1] = "Could not successfully set guest access" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_WLAN_CONFIGURATION, + "method": c.SET_GUEST_ACCESS_ENABLED_2, + "params": {"NewGuestAccessEnabled": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def set_5g_guest_access_enabled(self, value='0', test=False): + """Set Set5GGuestAccessEnabled.""" + theLog = {} + theLog[0] = "Setting 5G Guest Access Enabled" + theLog[1] = "Could not successfully set 5G guest access" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_WLAN_CONFIGURATION, + "method": c.SET_5G_GUEST_ACCESS_ENABLED, + "params": {"NewGuestAccessEnabled": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def set_5g_guest_access_enabled_2(self, value='0', test=False): + """Set Set5GGuestAccessEnabled2.""" + theLog = {} + theLog[0] = "Setting 5G Guest Access Enabled" + theLog[1] = "Could not successfully set 5G guest access" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_WLAN_CONFIGURATION, + "method": c.SET_5G_GUEST_ACCESS_ENABLED_2, + "params": {"NewGuestAccessEnabled": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def set_5g_guest_access_enabled_3(self, value='0', test=False): + """Set Set5G1GuestAccessEnabled2.""" + theLog = {} + theLog[0] = "Setting 5G Guest Access Enabled" + theLog[1] = "Could not successfully set 5G guest access" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_WLAN_CONFIGURATION, + "method": c.SET_5G1_GUEST_ACCESS_ENABLED_2, + "params": {"NewGuestAccessEnabled": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse + + def get_wpa_security_keys(self, test=False): + """Parse GetWPASecurityKeys and return dict.""" + theLog = "Get WPA Security Keys" + parseNode = f".//{c.GET_WPA_SECURITY_KEYS}Response" + toParse = ['NewWPAPassphrase'] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, c.GET_WPA_SECURITY_KEYS, + parseNode, toParse, test + ) + + return theInfo + + def get_5g_wpa_security_keys(self, test=False): + """Parse Get5GWPASecurityKeys and return dict.""" + theLog = "Get 5G WPA Security Keys" + parseNode = f".//{c.GET_5G_WPA_SECURITY_KEYS}Response" + toParse = ['NewWPAPassphrase'] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, c.GET_5G_WPA_SECURITY_KEYS, + parseNode, toParse, test + ) + + return theInfo + + def get_5g_info(self, test=False): + """Parse Get5GInfo and return dict.""" + theLog = "Get 5G Info" + parseNode = f".//{c.GET_5G_INFO}Response" + toParse = [ + 'NewEnable', + 'NewSSIDBroadcast', + 'NewStatus', + 'NewSSID', + 'NewRegion', + 'NewChannel', + 'NewWirelessMode', + 'NewBasicEncryptionModes', + 'NewWEPAuthType', + 'NewWPAEncryptionModes', + 'NewWLANMACAddress', + ] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, c.GET_5G_INFO, + parseNode, toParse, test + ) + + return theInfo + + def get_2g_info(self, test=False): + """Parse GetInfo and return dict.""" + theLog = "Get 2G Info" + parseNode = f".//{c.GET_2G_INFO}Response" + toParse = [ + 'NewEnable', + 'NewSSIDBroadcast', + 'NewStatus', + 'NewSSID', + 'NewRegion', + 'NewChannel', + 'NewWirelessMode', + 'NewBasicEncryptionModes', + 'NewWEPAuthType', + 'NewWPAEncryptionModes', + 'NewWLANMACAddress', + ] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, c.GET_2G_INFO, + parseNode, toParse, test + ) + + return theInfo + + # def set_5g_wlan_wpa_psk_by_passphrase(self): + + # Response is GetInfo + def get_available_channel(self, test=False): + """Parse GetAvailableChannel and return dict.""" + theLog = "Get Available Channel" + parseNode = f".//{c.GET_2G_INFO}Response" + toParse = [ + 'NewEnable', + 'NewSSIDBroadcast', + 'NewStatus', + 'NewSSID', + 'NewRegion', + 'NewChannel', + 'NewWirelessMode', + 'NewBasicEncryptionModes', + 'NewWEPAuthType', + 'NewWPAEncryptionModes', + 'NewWLANMACAddress', + ] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, c.GET_AVAILABLE_CHANNEL, + parseNode, toParse, test + ) + + return theInfo + + def get_guest_access_network_info(self, test=False): + """Parse GetGuestAccessNetworkInfo and return dict.""" + theLog = "Get Guest Access Network Info" + parseNode = f".//{c.GET_GUEST_ACCESS_NETWORK_INFO}Response" + toParse = [ + 'NewSSID', + 'NewSecurityMode', + 'NewKey', + 'UserSetSchedule', + 'Schedule', + ] + + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_GUEST_ACCESS_NETWORK_INFO, parseNode, toParse, test + ) + + return theInfo + + # def set_guest_access_network(self): + + def get_5g_guest_access_network_info(self, test=False): + """Parse Get5GGuestAccessNetworkInfo and return dict.""" + theLog = "Get 5G Guest Access Network Info" + parseNode = f".//{c.GET_5G_GUEST_ACCESS_NETWORK_INFO}Response" + toParse = [ + 'NewSSID', + 'NewSecurityMode', + 'NewKey', + 'UserSetSchedule', + 'Schedule', + ] + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_5G_GUEST_ACCESS_NETWORK_INFO, parseNode, toParse, test + ) + + return theInfo + + # **NEW** + def get_smart_connect_enabled(self, test=False): + """Parse IsSmartConnectEnabled and return dict.""" + theLog = "Get Smart Connect Status" + parseNode = f".//{c.GET_SMART_CONNECT_ENABLED}Response" + toParse = ['NewSmartConnectEnable'] + theInfo = self._get( + theLog, c.SERVICE_WLAN_CONFIGURATION, + c.GET_SMART_CONNECT_ENABLED, parseNode, toParse, test + ) + + return theInfo + + # **NEW** + def set_smart_connect_enabled(self, value='0', test=False): + """Set SetSmartConnectEnable.""" + theLog = {} + theLog[0] = "Setting Smart Connect Enabled" + theLog[1] = "Could not successfully enable smart connect" + value = h.value_to_zero_or_one(value) + theRequest = { + "service": c.SERVICE_WLAN_CONFIGURATION, + "method": c.SET_SMART_CONNECT_ENABLED, + "params": {"NewSmartConnectEnable": value}, + "body": "", + "need_auth": True + } + + theResponse = self._set(theLog, theRequest, test) + + return theResponse diff --git a/pynetgear/__main__.py b/pynetgear/__main__.py index 55ece3f..0342084 100644 --- a/pynetgear/__main__.py +++ b/pynetgear/__main__.py @@ -1,22 +1,34 @@ +# encoding: utf-8 """Run PyNetgear from the command-line.""" import sys import os from argparse import ArgumentParser -from pynetgear import Netgear, BLOCK, ALLOW +from . import Netgear # pylint: disable=relative-beyond-top-level +from .const import ALLOW, BLOCK # pylint: disable=relative-beyond-top-level +from .commands import COMMANDS # pylint: disable=relative-beyond-top-level -def make_formatter(format_name): - """Returns a callable that outputs the data. Defaults to print.""" - +def make_formatter(format_name): # noqa # pylama C901 + """Return a callable that outputs the data. Defaults to print.""" if "json" in format_name: from json import dumps import datetime - def jsonhandler(obj): obj.isoformat() if isinstance(obj, (datetime.datetime, datetime.date)) else obj + + def jsonhandler(obj): + if isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + + return obj + if format_name == "prettyjson": - def jsondumps(data): return dumps(data, default=jsonhandler, indent=2, separators=(',', ': ')) + def jsondumps(data): + return dumps( + data, default=jsonhandler, indent=2, separators=(',', ': ') + ) else: - def jsondumps(data): return dumps(data, default=jsonhandler) + def jsondumps(data): + return dumps(data, default=jsonhandler) def jsonify(data): if isinstance(data, dict): @@ -37,20 +49,24 @@ def printer(data): def argparser(): - """Constructs the ArgumentParser for the CLI""" - + """Construct the ArgumentParser for the CLI.""" parser = ArgumentParser(prog='pynetgear') - parser.add_argument("--format", choices=['json', 'prettyjson', 'py'], default='prettyjson') + parser.add_argument( + "--format", choices=['json', 'prettyjson', 'py'], default='prettyjson' + ) + # Connection Config router_args = parser.add_argument_group("router connection config") router_args.add_argument("--host", help="Hostname for the router") router_args.add_argument("--user", help="Account for login") router_args.add_argument("--port", help="Port exposed on the router") - router_args.add_argument("--login-v2", help="Force the use of the cookie-based authentication", - dest="force_login_v2", default=False, action="store_true") router_args.add_argument( - "--password", + "--login-v2", help="Force the use of the cookie-based authentication", + dest="force_login_v2", default=False, action="store_true" + ) + router_args.add_argument( + "-p", "--password", help="Not required with a wired connection." + "Optionally, set the PYNETGEAR_PASSWORD environment variable") router_args.add_argument( @@ -62,62 +78,94 @@ def argparser(): subparsers = parser.add_subparsers( description="Runs subcommand against the specified router", - dest="subcommand") + dest="subcommand", metavar="") - block_parser = subparsers.add_parser( - "block_device", - help="Blocks a device from connecting by mac address") - block_parser.add_argument("--mac-addr") + # loop through COMMANDS and create the cli args + for command, value in COMMANDS.items(): - allow_parser = subparsers.add_parser( - "allow_device", - help="Allows a device with the mac address to connect") - allow_parser.add_argument("--mac-addr") + if len(value) == 3: + strAddParser = subparsers.add_parser(command, help=value[1]) - subparsers.add_parser("login", help="Attempts to login to router.") + for _, aArgs in value[2].items(): - attached_devices = subparsers.add_parser("attached_devices", help="Outputs all attached devices") - attached_devices.add_argument( - "-v", "--verbose", - action="store_true", - default=False, - help="Choose between verbose and slower or terse and fast.") + if aArgs[2]: + strAddParser.add_argument( + aArgs[0], aArgs[1], + help=aArgs[4], choices=aArgs[2]) - subparsers.add_parser("traffic_meter", help="Output router's traffic meter data") - - return parser + elif aArgs[3] == 'store_true': + strAddParser.add_argument( + aArgs[0], aArgs[1], help=aArgs[4], + action='store_true', default=False) + else: + strAddParser.add_argument( + aArgs[0], aArgs[1], help=aArgs[4]) -def run_subcommand(netgear, args): - """Runs the subcommand configured in args on the netgear session""" + else: + subparsers.add_parser(command, help=value[1]) - subcommand = args.subcommand + return parser - if subcommand == "block_device" or subcommand == "allow_device": - return netgear.allow_block_device(args.mac_addr, BLOCK if subcommand == "block_device" else ALLOW) - if subcommand == "attached_devices": - if args.verbose: - return netgear.get_attached_devices_2() +def run_subcommand(netgear, args): # noqa # pylama C901 + """Run the subcommand configured in args on the netgear session.""" + subcommand = args.subcommand + response = None + + if subcommand in COMMANDS: + theFunction = COMMANDS[subcommand][0] + test = False + verbose = False + enable = False + mac = False + action = None + if hasattr(args, 'test'): + test = args.test + if hasattr(args, 'verbose'): + verbose = args.verbose + if hasattr(args, 'enable'): + enable = args.enable + if hasattr(args, 'mac'): + mac = args.mac + if hasattr(args, 'action'): + action = args.action + + # MOST functions have a test argument + # Handle verbose cl arg + if verbose: + response = getattr(netgear, 'get_attached_devices_2')(test) + # Special case for block device + elif subcommand == 'block_device_cli': + theAction = BLOCK + if action == 'allow': + theAction = ALLOW + response = getattr(netgear, theFunction)(test, mac, theAction) + # If enable = y|n + elif enable: + response = getattr(netgear, theFunction)(test, enable) + # if command with test, and test=true + elif test: + response = getattr(netgear, theFunction)(test) + # fallback else: - return netgear.get_attached_devices() + response = getattr(netgear, theFunction)() - if subcommand == 'traffic_meter': - return netgear.get_traffic_meter() - - if subcommand == 'login': - return netgear.login() + else: + print("Unknown subcommand") - print("Unknown subcommand") + return response def main(): """Scan for devices and print results.""" - args = argparser().parse_args(sys.argv[1:]) password = os.environ.get('PYNETGEAR_PASSWORD') or args.password - netgear = Netgear(password, args.host, args.user, args.port, args.ssl, args.url, args.force_login_v2) + netgear = Netgear( + password, args.host, args.user, args.port, + args.ssl, args.url, args.force_login_v2 + ) results = run_subcommand(netgear, args) formatter = make_formatter(args.format) diff --git a/pynetgear/commands.py b/pynetgear/commands.py new file mode 100644 index 0000000..26730f5 --- /dev/null +++ b/pynetgear/commands.py @@ -0,0 +1,410 @@ +# encoding: utf-8 +"""Dict of COMMANDS.""" +# clArg: [function, help, args:{ +# shortCommand, LongCommand, choices +# store_true, help +# }] + +COMMANDS = { + # --------------------- + # SERVICE_DEVICE_CONFIG + # --------------------- + 'login': ['login', 'Attempts to login to router'], + 'reboot': ['reboot', 'Reboot Router', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'check_fw': ['check_new_firmware', 'Check for new firmware', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'check_app_fw': ['check_app_new_firmware', 'Check app for new firmware', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'get_device_config_info': [ + 'get_device_config_info', 'Get Device Config Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'enable_block_device': [ + 'set_block_device_enable', 'Enable Access Control', { + 'enable': [ + '-e', '--enable', False, + 'store_true', 'This switch will enable, without will disable'], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'block_device_status': [ + 'get_block_device_enable_status', 'Get Access Control Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'block_device_cli': [ + 'set_block_device_by_mac', 'Allow/Block Device by MAC', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + 'mac': [ + '-m', '--mac', False, + False, 'MAC Address to Allow/Block'], + 'action': [ + '-a', '--action', ['allow', 'block'], + False, 'Action to take, Allow or Block'], + } + ], + # **SET** + 'enable_traffic_meter': [ + 'enable_traffic_meter', 'Enable/Disable Traffic Meter', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', 'This switch will enable, without will disable'], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'traffic_meter': [ + 'get_traffic_meter_statistics', 'Get Traffic Meter Statistics', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'traffic_meter_enabled': [ + 'get_traffic_meter_enabled', 'Get Traffic Meter Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'traffic_meter_options': [ + 'get_traffic_meter_options', 'Get Traffic Meter Options', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # --------------------- + # SERVICE_LAN_CONFIG_SECURITY + # --------------------- + # GET + 'get_lan_config_info': [ + 'get_lan_config_sec_info', 'Get LAN Config Sec Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # --------------------- + # SERVICE_WAN_IP_CONNECTION + # --------------------- + # GET + 'get_wan_ip_info': ['get_wan_ip_con_info', 'Get WAN IP Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # --------------------- + # SERVICE_PARENTAL_CONTROL + # --------------------- + # **SET** + 'enable_parental_control': [ + 'enable_parental_control', 'Enable/Disable Parental Control', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', 'This switch will enable, without will disable'], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'parental_control_status': [ + 'get_parental_control_enable_status', 'Get Parental Control Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'mac_address': [ + 'get_all_mac_addresses', 'Get all MAC Addresses', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'dns_masq': [ + 'get_dns_masq_device_id', 'Get DNS Masq Device ID', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # --------------------- + # SERVICE_DEVICE_INFO + # --------------------- + # GET + 'info': [ + 'get_info', 'Get Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'support_feature': [ + 'get_support_feature_list_XML', 'Get Supported Features', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'attached_devices': [ + 'get_attached_devices', 'Get Attached Devices', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + 'verbose': [ + '-v', '--verbose', False, + 'store_true', 'This switch will enable, without will disable'], + } + ], + # GET + 'attached_devices2': [ + 'get_attached_devices_2', 'Get Attached Devices 2', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # --------------------- + # SERVICE_ADVANCED_QOS + # --------------------- + # **SET** + 'speed_test_start': [ + 'set_speed_test_start', 'Start Speed Test', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'speed_test_result': [ + 'get_speed_test_result', 'Get Speed Test Results', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'qos_enabled': [ + 'get_qos_enable_status', 'Get QOS Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'emable_qos': [ + 'set_qos_enable_status', 'Enable/Disable QOS', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', 'This switch will enable, without will disable'], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'bw_control': [ + 'get_bandwidth_control_options', 'Get Bandwidth Control Options', + { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # --------------------- + # SERVICE_WLAN_CONFIGURATION + # --------------------- + # **SET** + 'guest_access_enable': [ + 'set_guest_access_enabled', 'Enable/Disable Guest 2.4G Wifi', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', 'This switch will enable, without will disable'], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'guest_access_enable2': [ + 'set_guest_access_enabled_2', 'Enable/Disable Guest 2.4G Wifi', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', 'This switch will enable, without will disable'], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'guest_access': [ + 'get_guest_access_enabled', 'Get 2G Guest Wifi Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'guest_access_enable_5g': [ + 'set_5g_guest_access_enabled', 'Enable/Disable Guest 5G Wifi', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', False], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'guest_access_enable_5g_2': [ + 'set_5g_guest_access_enabled_2', 'Enable/Disable Guest 5G Wifi', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', False], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'guest_access_enable_5g_3': [ + 'set_5g_guest_access_enabled_3', 'Enable/Disable Guest 5G Wifi', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', False], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'guest_access_5g': [ + 'get_5g_guest_access_enabled', 'Get 5G Guest Wifi Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'wpa_key': [ + 'get_wpa_security_keys', 'Get 2G WPA Key', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'wpa_key_5g': [ + 'get_5g_wpa_security_keys', 'Get 5G WPA Key', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'get_2g_info': [ + 'get_2g_info', 'Get 2G Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'get_5g_info': [ + 'get_5g_info', 'Get 5G Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'get_channel': [ + 'get_available_channel', 'Get Channel', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'guest_access_net': [ + 'get_guest_access_network_info', 'Get 2G Guest Wifi Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'guest_access_net_5g': [ + 'get_5g_guest_access_network_info', 'Get 5G Guest Wifi Info', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # GET + 'get_smart_conn': ['get_smart_connect_enabled', 'Get Smart Conn Status', { + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], + # **SET** + 'set_smart_conn': [ + 'set_smart_connect_enabled', 'Enable/Disable Smart Connect', + { + 'enable': [ + '-e', '--enable', False, + 'store_true', False], + 'test': [ + '-t', '--test', False, + 'store_true', 'Output SOAP Response'], + } + ], +} diff --git a/pynetgear/const.py b/pynetgear/const.py new file mode 100644 index 0000000..5225f3c --- /dev/null +++ b/pynetgear/const.py @@ -0,0 +1,161 @@ +# encoding: utf-8 +"""Constants for pynetgear.""" + +# --------------------- +# DEFAULTS +# --------------------- +DEFAULT_HOST = 'routerlogin.net' +DEFAULT_USER = 'admin' +DEFAULT_PORT = 5000 + +BLOCK = "Block" +ALLOW = "Allow" + +UNKNOWN_DEVICE_DECODED = '' +UNKNOWN_DEVICE_ENCODED = '<unknown>' + +REGEX_ATTACHED_DEVICES = r"(.*)" + +# Until we know how to generate it, give the one we captured +SESSION_ID = "A7D88AE69687E58D9A00" + +# --------------------- +# SERVICE +# --------------------- +SERVICE_PREFIX = "urn:NETGEAR-ROUTER:service:" +SERVICE_DEVICE_INFO = "DeviceInfo:1" +SERVICE_DEVICE_CONFIG = "DeviceConfig:1" +SERVICE_LAN_CONFIG_SECURITY = "LANConfigSecurity:1" +SERVICE_WAN_IP_CONNECTION = "WANIPConnection:1" +SERVICE_PARENTAL_CONTROL = "ParentalControl:1" +SERVICE_ADVANCED_QOS = "AdvancedQoS:1" +SERVICE_WLAN_CONFIGURATION = "WLANConfiguration:1" +SERVICE_USER_OPTIONS_TC = "UserOptionsTC:1" + +# --------------------- +# SERVICE_DEVICE_CONFIG +# --------------------- +LOGIN = 'SOAPLogin' +# LOGOUT = 'SOAPLogout' +REBOOT = 'Reboot' +CHECK_NEW_FIRMWARE = 'CheckNewFirmware' +# UPDATE_NEW_FIRMWARE = 'UpdateNewFirmware' +CHECK_APP_NEW_FIRMWARE = 'CheckAppNewFirmware' # ***NEW*** +CONFIGURATION_STARTED = 'ConfigurationStarted' +CONFIGURATION_FINISHED = 'ConfigurationFinished' +GET_DEVICE_CONFIG_INFO = 'GetInfo' # ***NEW*** + +# BLOCK/ALLOW DEVICE +SET_BLOCK_DEVICE_ENABLE = 'SetBlockDeviceEnable' +GET_BLOCK_DEVICE_ENABLE_STATUS = 'GetBlockDeviceEnableStatus' +# ENABLE_BLOCK_DEVICE_FOR_ALL = 'EnableBlockDeviceForAll' # deprecated? +SET_BLOCK_DEVICE_BY_MAC = 'SetBlockDeviceByMAC' + +# TRAFFIC METER +GET_TRAFFIC_METER_STATISTICS = 'GetTrafficMeterStatistics' +ENABLE_TRAFFIC_METER = 'EnableTrafficMeter' +GET_TRAFFIC_METER_ENABLED = 'GetTrafficMeterEnabled' +# SET_TRAFFIC_METER_OPTIONS = 'SetTrafficMeterOptions' +GET_TRAFFIC_METER_OPTIONS = 'GetTrafficMeterOptions' + +# --------------------- +# SERVICE_LAN_CONFIG_SECURITY +# --------------------- +GET_LAN_CONFIG_SEC_INFO = 'GetInfo' # ***NEW*** + +# --------------------- +# SERVICE_WAN_IP_CONNECTION +# --------------------- +GET_WAN_IP_CON_INFO = 'GetInfo' # ***NEW*** + +# --------------------- +# SERVICE_PARENTAL_CONTROL +# --------------------- +LOGIN_OLD = 'Authenticate' +ENABLE_PARENTAL_CONTROL = 'EnableParentalControl' +GET_PARENTAL_CONTROL_ENABLE_STATUS = 'GetEnableStatus' +GET_ALL_MAC_ADDRESSES = 'GetAllMACAddresses' +# SET_DNS_MASQ_DEVICE_ID = 'SetDNSMasqDeviceID' +GET_DNS_MASQ_DEVICE_ID = 'GetDNSMasqDeviceID' +# DELETE_MAC_ADDRESS = 'DeleteMACAddress' + +# --------------------- +# SERVICE_DEVICE_INFO +# --------------------- +GET_INFO = 'GetInfo' +GET_SUPPORT_FEATURE_LIST_XML = 'GetSupportFeatureListXML' +GET_ATTACHED_DEVICES = 'GetAttachDevice' +GET_ATTACHED_DEVICES_2 = 'GetAttachDevice2' +# SET_DEVICE_NAME_ICON_BY_MAC = 'SetDeviceNameIconByMAC' +# SET_DEVICE_NAME = 'SetNetgearDeviceName' # ***NEW*** + +# --------------------- +# SERVICE_ADVANCED_QOS +# --------------------- +SET_SPEED_TEST_START = 'SetOOKLASpeedTestStart' +GET_SPEED_TEST_RESULT = 'GetOOKLASpeedTestResult' +SET_QOS_ENABLE_STATUS = 'SetQoSEnableStatus' +GET_QOS_ENABLE_STATUS = 'GetQoSEnableStatus' +# SET_BANDWIDTH_CONTROL_OPTIONS = 'SetBandwidthControlOptions' +GET_BANDWIDTH_CONTROL_OPTIONS = 'GetBandwidthControlOptions' +GET_CURRENT_APP_BANDWIDTH = 'GetCurrentAppBandwidth' # Not Working +GET_CURRENT_DEVICE_BANDWIDTH = 'GetCurrentDeviceBandwidth' # Not Working +GET_CURRENT_APP_BANDWIDTH_BY_MAC = 'GetCurrentAppBandwidthByMAC' # Not Working + +# --------------------- +# SERVICE_WLAN_CONFIGURATION +# --------------------- +SET_GUEST_ACCESS_ENABLED = 'SetGuestAccessEnabled' # 2.4G-1 R7800 +GET_GUEST_ACCESS_ENABLED = 'GetGuestAccessEnabled' # 2.4G-1 R7800/R8000 +SET_GUEST_ACCESS_ENABLED_2 = 'SetGuestAccessEnabled2' # 2.4G-1 R8000 +GET_GUEST_ACCESS_ENABLED_2 = 'GetGuestAccessEnabled2' # 2.4G-1 R8000 +SET_5G_GUEST_ACCESS_ENABLED = 'Set5GGuestAccessEnabled' # 5G-1 R7800 +GET_5G1_GUEST_ACCESS_ENABLED = 'Get5GGuestAccessEnabled' # 5G-1 R7800 +GET_5G1_GUEST_ACCESS_ENABLED_2 = 'Get5G1GuestAccessEnabled' # 5G-1 R8000 +SET_5G1_GUEST_ACCESS_ENABLED_2 = 'Set5G1GuestAccessEnabled2' # 5G-1 R8000 +SET_5G_GUEST_ACCESS_ENABLED_2 = 'Set5GGuestAccessEnabled2' # 5G-2 R8000 +GET_5G_GUEST_ACCESS_ENABLED_2 = 'Get5GGuestAccessEnabled2' # 5G-2 R8000 +GET_WPA_SECURITY_KEYS = 'GetWPASecurityKeys' +GET_5G_WPA_SECURITY_KEYS = 'Get5GWPASecurityKeys' +GET_2G_INFO = 'GetInfo' +GET_5G_INFO = 'Get5GInfo' +# SET_5G_WLAN_WPA_PSK_BY_PASSPHRASE = 'Set5GWLANWPAPSKByPassphrase' +GET_AVAILABLE_CHANNEL = 'GetAvailableChannel' +# SET_GUEST_ACCESS_NETWORK = 'SetGuestAccessNetwork' +GET_GUEST_ACCESS_NETWORK_INFO = 'GetGuestAccessNetworkInfo' +# SET_5G_GUEST_ACCESS_NETWORK = 'Set5GGuestAccessNetwork' +GET_5G_GUEST_ACCESS_NETWORK_INFO = 'Get5GGuestAccessNetworkInfo' +GET_SMART_CONNECT_ENABLED = 'IsSmartConnectEnabled' # ***NEW*** +SET_SMART_CONNECT_ENABLED = 'SetSmartConnectEnable' # ***NEW*** + +# --------------------- +# SERVICE_USER_OPTIONS_TC +# --------------------- +SET_USER_OPTION_TC = 'SetUserOptionsTC' # ***NEW*** + +# --------------------- +# FORMATTING +# --------------------- +SOAP_REQUEST = """ + + +{session_id} + +{body} + +""" + +LOGIN_V1_BODY = """ + + {username} + {password} + +""" + +CALL_BODY = """ + +{params} +""" diff --git a/pynetgear/helpers.py b/pynetgear/helpers.py new file mode 100644 index 0000000..0cea735 --- /dev/null +++ b/pynetgear/helpers.py @@ -0,0 +1,179 @@ +# encoding: utf-8 +"""Helper functions for pynetgear.""" +from io import StringIO +import xml.etree.ElementTree as ET +import re +import sys +import requests +import logging +from datetime import timedelta + +from .const import SERVICE_PREFIX # pylint: disable=relative-beyond-top-level + +_LOGGER = logging.getLogger(__name__) + +# define regex to filter invalid XML codes +# cf https://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml +# -unicode-chars-in-python +if sys.version_info[0] == 3: + unichr = chr +_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), + (0x7F, 0x84), (0x86, 0x9F), + (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] +if sys.maxunicode >= 0x10000: # not narrow build + _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), + (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), + (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), + (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), + (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), + (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), + (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), + (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)]) + +_illegal_ranges = ["%s-%s" % (unichr(low), unichr(high)) + for (low, high) in _illegal_unichrs] + +_illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(_illegal_ranges)) + + +def to_get(parseNode, toParse, response): + """Create a dict of the node information.""" + success, theNode = find_node(response.text, parseNode) + + if not success: + return False + + theInfo = {} + + for x in toParse: + theItem = xml_get(theNode, x) + if theItem: + theInfo[x] = theItem + else: + theInfo[x] = None + + return theInfo + + +def autodetect_url(): + """ + Try to autodetect the base URL of the router SOAP service. + + Returns None if it can't be found. + """ + for url in ["http://routerlogin.net:5000", "https://routerlogin.net", + "http://routerlogin.net"]: + try: + r = requests.get(url + "/soap/server_sa/", + headers=get_soap_headers("Test:1", "test"), + verify=False) + if r.status_code == 200: + return url + except requests.exceptions.RequestException: + pass + + return None + + +def find_node(text, xpath): + """Look for a node in xml.""" + text = _illegal_xml_chars_RE.sub('', text) + it = ET.iterparse(StringIO(text)) + # strip all namespaces + for _, el in it: + if '}' in el.tag: + el.tag = el.tag.split('}', 1)[1] + node = it.root.find(xpath) + if node is None: + _LOGGER.error("Error finding node in XML response") + _LOGGER.debug(text) + return False, None + + return True, node + + +def xml_get(e, name): + """ + Return the value of the subnode "name" of element e. + + Return None if the subnode doesn't exist + """ + r = e.find(name) + if r is not None: + return r.text + return None + + +def get_soap_headers(service, method): + """Return Soap Headers.""" + action = SERVICE_PREFIX + service + "#" + method + return { + "SOAPAction": action, + "Cache-Control": "no-cache", + "User-Agent": "pynetgear", + "Content-Type": "multipart/form-data" + } + + +def is_valid_response(resp): + """Check if is valid.""" + return (resp.status_code == 200 and + ("0000" in resp.text or + "000" in resp.text or + # Speed Test Result + "2" in resp.text or + # dns_masq/mac_address + "001" in resp.text + )) + + +def is_unauthorized_response(resp): + """Check if is unauthorized.""" + return (resp.status_code == 401 or + "401" in resp.text) + + +def convert(value, to_type, default=None): + """Convert value to to_type, returns default if fails.""" + try: + return default if value is None else to_type(value) + except ValueError: + # If value could not be converted + return default + + +def parse_text(text): + """ + There are three kinds of values in the returned data. + + This function parses the different values and returns + (total, avg), timedelta or a plain float + """ + def tofloats(lst): + return (float(t) for t in lst) + try: + if "/" in text: # "6.19/0.88" total/avg + return tuple(tofloats(text.split('/'))) + + if ":" in text: # 11:14 hr:mn + hour, mins = tofloats(text.split(':')) + return timedelta(hours=hour, minutes=mins) + + return float(text) + except ValueError: + return None + + +def value_to_zero_or_one(s): + """Convert value to 1 or 0 string.""" + if isinstance(s, str): + if s.lower() in ('true', 't', 'yes', 'y', '1'): + return '1' + if s.lower() in ('false', 'f', 'no', 'n', '0'): + return '0' + if isinstance(s, bool): + if s: + return '1' + return '0' + + raise ValueError("Cannot covert {} to a 1 or 0".format(s))