diff --git a/pynintendoparental/__init__.py b/pynintendoparental/__init__.py index 7e58c23..36d4480 100644 --- a/pynintendoparental/__init__.py +++ b/pynintendoparental/__init__.py @@ -3,34 +3,33 @@ import asyncio -from pynintendoparental.exceptions import HttpException, NoDevicesFoundException +from pynintendoauth.exceptions import HttpException -from .authenticator import Authenticator from .api import Api from .const import _LOGGER from .device import Device +from .exceptions import NoDevicesFoundException +from .authenticator import Authenticator + class NintendoParental: """Core Python API.""" - def __init__(self, - auth: Authenticator, - timezone, - lang) -> None: + def __init__(self, auth: Authenticator, timezone, lang) -> None: self._api: Api = Api(auth=auth, tz=timezone, lang=lang) self.account_id = auth.account_id self.devices: dict[str, Device] = {} async def _get_devices(self): """Gets devices from the API and stores in self.devices""" + async def update_device(dev: Device): """Update a device.""" try: await dev.update() except Exception as err: - _LOGGER.exception("Error updating device %s: %s", - dev.device_id, - err) + _LOGGER.exception("Error updating device %s: %s", dev.device_id, err) + try: response = await self._api.async_get_account_devices() except HttpException as err: @@ -55,10 +54,9 @@ async def update(self): _LOGGER.debug("Update complete.") @classmethod - async def create(cls, - auth: Authenticator, - timezone: str = "Europe/London", - lang: str = "en-GB") -> 'NintendoParental': + async def create( + cls, auth: Authenticator, timezone: str = "Europe/London", lang: str = "en-GB" + ) -> "NintendoParental": """Create an instance of NintendoParental.""" self = cls(auth, timezone, lang) await self.update() diff --git a/pynintendoparental/_version.py b/pynintendoparental/_version.py index c57bfd5..6c8e6b9 100644 --- a/pynintendoparental/_version.py +++ b/pynintendoparental/_version.py @@ -1 +1 @@ -__version__ = '0.0.0' +__version__ = "0.0.0" diff --git a/pynintendoparental/api.py b/pynintendoparental/api.py index 0f15662..689e6d1 100644 --- a/pynintendoparental/api.py +++ b/pynintendoparental/api.py @@ -2,6 +2,8 @@ import aiohttp +from pynintendoauth.exceptions import HttpException + from .authenticator import Authenticator from .const import ( ENDPOINTS, @@ -13,13 +15,14 @@ OS_VERSION, OS_NAME, DEVICE_MODEL, - _LOGGER + _LOGGER, ) -from .exceptions import HttpException + def _check_http_success(status: int) -> bool: return status >= 200 and status < 300 + class Api: """Nintendo Parental Controls API.""" @@ -48,10 +51,10 @@ def _headers(self) -> dict: "X-Moon-TimeZone": self._tz, "X-Moon-Os-Language": self._language, "X-Moon-App-Language": self._language, - "Authorization": self._auth.access_token + "Authorization": self._auth.access_token, } - async def send_request(self, endpoint: str, body: object=None, **kwargs): + async def send_request(self, endpoint: str, body: object = None, **kwargs): """Sends a request to a given endpoint.""" _LOGGER.debug("Sending request to %s", endpoint) # Get the endpoint from the endpoints map @@ -66,46 +69,45 @@ async def send_request(self, endpoint: str, body: object=None, **kwargs): url = e_point.get("url").format(BASE_URL=BASE_URL, **kwargs) _LOGGER.debug("Built URL %s", url) # now send the HTTP request - resp: dict = { - "status": 0, - "text": "", - "json": "", - "headers": "" - } - async with self._auth.client_session.request( - method=e_point.get("method"), - url=url, - json=body, - headers=self._headers - ) as response: - _LOGGER.debug("%s request to %s status code %s", - e_point.get("method"), - url, - response.status) - if _check_http_success(response.status): - resp["status"] = response.status - resp["text"] = await response.text() + resp: dict = {"status": 0, "text": "", "json": "", "headers": ""} + response = await self._auth.async_authenticated_request( + method=e_point.get("method"), url=url, headers=self._headers, body=body + ) + _LOGGER.debug( + "%s request to %s status code %s", + e_point.get("method"), + url, + response.status, + ) + if not _check_http_success(response.status): + if response.content_type == "application/problem+json": try: - resp["json"] = await response.json() - except (aiohttp.ContentTypeError, ValueError) as e: - _LOGGER.warning( - """Failed to decode JSON response from %s. - Status: %s, Error: %s. - Response text: %s...""", - url, response.status, e, resp['text'][:200] - ) - resp["json"] = {} - resp["headers"] = response.headers - else: - if response.content_type == "application/problem+json": - try: - error = await response.json() - if "detail" in error: - raise HttpException(response.status, error["detail"], error.get("errorCode")) - except (aiohttp.ContentTypeError, ValueError): - # Fall through to the generic exception below on parsing failure. - pass - raise HttpException(response.status, await response.text()) + error: dict = await response.json() + if "detail" in error: + raise HttpException( + response.status, error["detail"], error.get("errorCode") + ) + except (aiohttp.ContentTypeError, ValueError): + # Fall through to the generic exception below on parsing failure. + pass + raise HttpException(response.status, await response.text()) + + resp["status"] = response.status + resp["text"] = await response.text() + try: + resp["json"] = await response.json() + except (aiohttp.ContentTypeError, ValueError) as e: + _LOGGER.warning( + """Failed to decode JSON response from %s. + Status: %s, Error: %s. + Response text: %s...""", + url, + response.status, + e, + resp["text"][:200], + ) + resp["json"] = {} + resp["headers"] = response.headers # now return the resp dict return resp @@ -113,109 +115,83 @@ async def send_request(self, endpoint: str, body: object=None, **kwargs): async def async_get_account_details(self) -> dict: """Get account details.""" return await self.send_request( - endpoint="get_account_details", - ACCOUNT_ID=self.account_id + endpoint="get_account_details", ACCOUNT_ID=self.account_id ) async def async_get_account_devices(self) -> dict: """Get account devices.""" - return await self.send_request( - endpoint="get_account_devices" - ) + return await self.send_request(endpoint="get_account_devices") async def async_get_account_device(self, device_id: str) -> dict: """Get account device.""" return await self.send_request( - endpoint="get_account_device", - DEVICE_ID=device_id + endpoint="get_account_device", DEVICE_ID=device_id ) async def async_get_device_daily_summaries(self, device_id: str) -> dict: """Get device daily summaries.""" return await self.send_request( - endpoint="get_device_daily_summaries", - DEVICE_ID=device_id + endpoint="get_device_daily_summaries", DEVICE_ID=device_id ) async def async_get_device_monthly_summaries(self, device_id: str) -> dict: """Get device monthly summaries.""" return await self.send_request( - endpoint="get_device_monthly_summaries", - DEVICE_ID=device_id + endpoint="get_device_monthly_summaries", DEVICE_ID=device_id ) async def async_get_device_parental_control_setting(self, device_id: str) -> dict: """Get device parental control setting.""" return await self.send_request( - endpoint="get_device_parental_control_setting", - DEVICE_ID=device_id + endpoint="get_device_parental_control_setting", DEVICE_ID=device_id ) - async def async_get_device_parental_control_setting_state(self, device_id: str) -> dict: + async def async_get_device_parental_control_setting_state( + self, device_id: str + ) -> dict: """Get device parental control setting state.""" return await self.send_request( - endpoint="get_device_parental_control_setting_state", - DEVICE_ID=device_id + endpoint="get_device_parental_control_setting_state", DEVICE_ID=device_id ) - async def async_get_device_monthly_summary(self, device_id: str, year: int, month: int) -> dict: + async def async_get_device_monthly_summary( + self, device_id: str, year: int, month: int + ) -> dict: """Get device monthly summary.""" return await self.send_request( endpoint="get_device_monthly_summary", DEVICE_ID=device_id, YEAR=year, - MONTH=f"{month:02d}" + MONTH=f"{month:02d}", ) - async def async_update_restriction_level( - self, - settings: dict - ) -> dict: + async def async_update_restriction_level(self, settings: dict) -> dict: """Update device restriction level.""" return await self.send_request( - endpoint="update_restriction_level", - body=settings + endpoint="update_restriction_level", body=settings ) - async def async_update_play_timer( - self, - settings: dict - ) -> dict: + async def async_update_play_timer(self, settings: dict) -> dict: """Update device play timer settings.""" - return await self.send_request( - endpoint="update_play_timer", - body=settings - ) + return await self.send_request(endpoint="update_play_timer", body=settings) - async def async_update_unlock_code( - self, - new_code: str, - device_id: str - ) -> dict: + async def async_update_unlock_code(self, new_code: str, device_id: str) -> dict: """Update device unlock code.""" return await self.send_request( endpoint="update_unlock_code", - body={ - "deviceId": device_id, - "unlockCode": new_code - } + body={"deviceId": device_id, "unlockCode": new_code}, ) async def async_update_extra_playing_time( - self, - device_id: str, - additional_time: int + self, device_id: str, additional_time: int ) -> dict: """Update device extra playing time.""" body = { "deviceId": device_id, "additionalTime": additional_time, - "status": "TO_ADDED" + "status": "TO_ADDED", } if additional_time == -1: body["status"] = "TO_INFINITY" body.pop("additionalTime") - return await self.send_request( - endpoint="update_extra_playing_time", - body=body - ) + return await self.send_request(endpoint="update_extra_playing_time", body=body) diff --git a/pynintendoparental/application.py b/pynintendoparental/application.py index c524f0a..f945e52 100644 --- a/pynintendoparental/application.py +++ b/pynintendoparental/application.py @@ -4,6 +4,7 @@ from .const import _LOGGER + class Application: """Model for an application""" @@ -12,7 +13,7 @@ def __init__(self) -> None: self.application_id: str = None self.first_played_date: datetime = None self.has_ugc: bool = None - self.image_url: str = None # uses small image from Nintendo + self.image_url: str = None # uses small image from Nintendo self.playing_days: int = None self.shop_url: str = None self.name: str = None @@ -20,11 +21,10 @@ def __init__(self) -> None: def update_today_time_played(self, daily_summary: dict): """Updates the today time played for the given application.""" - _LOGGER.debug("Updating today time played for app %s", - self.application_id) + _LOGGER.debug("Updating today time played for app %s", self.application_id) self.today_time_played = daily_summary.get("playingTime", 0) - def update(self, updated: 'Application'): + def update(self, updated: "Application"): """Updates self with a given application.""" _LOGGER.debug("Updating application %s", self.application_id) self.application_id = updated.application_id @@ -37,7 +37,7 @@ def update(self, updated: 'Application'): self.today_time_played = updated.today_time_played @classmethod - def from_daily_summary(cls, raw: list) -> list['Application']: + def from_daily_summary(cls, raw: list) -> list["Application"]: """Converts a raw daily summary response into a list of applications.""" built = [] if "playedApps" in raw: @@ -49,7 +49,7 @@ def from_daily_summary(cls, raw: list) -> list['Application']: return built @staticmethod - def check_if_app_in_list(app_list: list['Application'], app: 'Application') -> bool: + def check_if_app_in_list(app_list: list["Application"], app: "Application") -> bool: """Checks if an app is in a list.""" for app_li in app_list: if app_li.application_id == app.application_id: @@ -57,7 +57,9 @@ def check_if_app_in_list(app_list: list['Application'], app: 'Application') -> b return False @staticmethod - def return_app_from_list(app_list: list['Application'], application_id: str) -> 'Application': + def return_app_from_list( + app_list: list["Application"], application_id: str + ) -> "Application": """Returns a single app from a given list.""" for app in app_list: if app.application_id == application_id: @@ -65,28 +67,32 @@ def return_app_from_list(app_list: list['Application'], application_id: str) -> return None @classmethod - def from_whitelist(cls, raw: dict) -> list['Application']: + def from_whitelist(cls, raw: dict) -> list["Application"]: """Converts a raw whitelist response into a list of applications.""" parsed = [] for app_id in raw: _LOGGER.debug("Parsing app %s", app_id) internal = cls() internal.application_id = raw[app_id]["applicationId"] - internal.first_played_date = datetime.strptime(raw[app_id]["firstPlayDate"], "%Y-%m-%d") + internal.first_played_date = datetime.strptime( + raw[app_id]["firstPlayDate"], "%Y-%m-%d" + ) internal.image_url = raw[app_id]["imageUri"] internal.name = raw[app_id]["title"] parsed.append(internal) return parsed @classmethod - def from_monthly_summary(cls, raw: list) -> list['Application']: + def from_monthly_summary(cls, raw: list) -> list["Application"]: """Converts a raw monthly summary response into a list of applications.""" parsed = [] for app in raw: _LOGGER.debug("Parsing app %s", app) internal = cls() internal.application_id = app.get("applicationId").capitalize() - internal.first_played_date = datetime.strptime(app.get("firstPlayDate"), "%Y-%m-%d") + internal.first_played_date = datetime.strptime( + app.get("firstPlayDate"), "%Y-%m-%d" + ) internal.has_ugc = app.get("hasUgc", False) internal.image_url = app.get("imageUri").get("small") internal.playing_days = app.get("playingDays", None) diff --git a/pynintendoparental/authenticator.py b/pynintendoparental/authenticator.py new file mode 100644 index 0000000..bdaf8a7 --- /dev/null +++ b/pynintendoparental/authenticator.py @@ -0,0 +1,18 @@ +"""Nintendo Authentication.""" + +from __future__ import annotations + +from pynintendoauth import NintendoAuth + +from .const import CLIENT_ID + + +class Authenticator(NintendoAuth): + """Authentication functions.""" + + def __init__(self, session_token=None, client_session=None): + super().__init__( + client_id=CLIENT_ID, + session_token=session_token, + client_session=client_session, + ) diff --git a/pynintendoparental/authenticator/__init__.py b/pynintendoparental/authenticator/__init__.py deleted file mode 100644 index 3cc59ef..0000000 --- a/pynintendoparental/authenticator/__init__.py +++ /dev/null @@ -1,226 +0,0 @@ -"""Nintendo Authentication.""" -from __future__ import annotations - -import logging -import base64 -import hashlib -import random -import string - -from urllib.parse import urlencode, urlparse - -from datetime import datetime, timedelta - -import aiohttp - -from pynintendoparental.exceptions import ( - HttpException, - InvalidOAuthConfigurationException, - InvalidSessionTokenException -) -from .const import ( - TOKEN_URL, - SESSION_TOKEN_URL, - CLIENT_ID, - GRANT_TYPE, - MY_ACCOUNT_ENDPOINT, - REDIRECT_URI, - SCOPES, - AUTHORIZE_URL -) - -_LOGGER = logging.getLogger(__name__) - -def _parse_response_token(token: str) -> dict: - """Parses a response token.""" - _LOGGER.debug(">> Parsing response token.") - try: - url = urlparse(token) - params = url.fragment.split('&') - response = {} - for param in params: - response = { - **response, - param.split('=')[0]: param.split('=')[1] - } - return response - except Exception as exc: - raise ValueError("Invalid token provided.") from exc - -def _hash(text: str): - """Hash given text for login.""" - text = hashlib.sha256(text.encode()).digest() - text = base64.urlsafe_b64encode(text).decode() - return text.replace("=", "") - -def _rand(): - return ''.join(random.choice(string.ascii_letters) for _ in range(50)) - -class Authenticator: - """Authentication functions.""" - - def __init__( - self, - session_token = None, - auth_code_verifier = None, - client_session: aiohttp.ClientSession = None - ): - """Basic init.""" - _LOGGER.debug(">> Init authenticator.") - self._at_expiry: datetime = None - self._access_token: str = None - self.available_scopes: dict = None - self.account_id: str = None - self.account: dict = None - self._auth_code_verifier: str = auth_code_verifier - self._refresh_token: str = None - self._id_token: str = None - self._session_token: str = session_token - self.login_url: str = None - if client_session is None: - client_session = aiohttp.ClientSession() - self.client_session: aiohttp.ClientSession = client_session - - @property - def get_session_token(self) -> str: - """Return the session token.""" - return self._session_token - - @property - def access_token(self) -> str: - """Return the formatted access token.""" - return f"Bearer {self._id_token}" # v2 seems to use ID token for API access? - - @property - def access_token_expired(self) -> bool: - """Check if the access token has expired.""" - return self._at_expiry < (datetime.now()+timedelta(minutes=1)) - - async def _request_handler(self, method, url, json=None, data=None, headers: dict=None): - """Send a HTTP request""" - if headers is None: - headers = {} - response: dict = { - "status": 0, - "text": "", - "json": "", - "headers": "" - } - async with self.client_session.request( - method=method, - url=url, - json=json, - data=data, - headers=headers - ) as resp: - response["status"] = resp.status - response["text"] = await resp.text() - response["json"] = await resp.json() - response["headers"] = resp.headers - return response - - def _read_tokens(self, tokens: dict): - """Reads tokens into self.""" - self.available_scopes = tokens.get("scope") - self._at_expiry = datetime.now() + timedelta(seconds=tokens.get("expires_in")) - self._id_token = tokens.get("id_token") - self._access_token = tokens.get("access_token") - - async def perform_login(self, session_token_code): - """Retrieves initial tokens.""" - _LOGGER.debug("Performing initial login.") - session_token_form = aiohttp.FormData() - session_token_form.add_field("client_id", CLIENT_ID) - session_token_form.add_field("session_token_code", session_token_code) - session_token_form.add_field("session_token_code_verifier", self._auth_code_verifier) - session_token_response = await self._request_handler( - method="POST", - url=SESSION_TOKEN_URL, - data=session_token_form - ) - - if session_token_response.get("status") != 200: - raise HttpException(session_token_response.get("status"), - session_token_response.get("text")) - - self._session_token = session_token_response["json"]["session_token"] - - async def perform_refresh(self): - """Refresh the access token.""" - _LOGGER.debug("Refreshing access token.") - token_response = await self._request_handler( - method="POST", - url=TOKEN_URL, - json={ - "client_id": CLIENT_ID, - "grant_type": GRANT_TYPE, - "session_token": self.get_session_token - } - ) - - if token_response["status"] == 400: - raise InvalidSessionTokenException(400, token_response["json"]["error"]) - - if token_response["status"] == 401: - raise InvalidOAuthConfigurationException(401, token_response["json"]["error"]) - - if token_response.get("status") != 200: - raise HttpException(token_response.get("status"), f"login error {token_response.get('status')}") - - self._read_tokens(token_response.get("json")) - if self.account_id is None: - # fill account_id - account = await self._request_handler( - method="GET", - url=MY_ACCOUNT_ENDPOINT, - headers={ - "Authorization": f"Bearer {self._access_token}" - } - ) - if account["status"] != 200: - raise HttpException(account["status"], f"Unable to get account_id {account['status']}") - self.account_id = account["json"]["id"] - self.account = account["json"] - - @classmethod - def generate_login( - cls, - client_session: aiohttp.ClientSession | None = None) -> 'Authenticator': - """Starts configuration of the authenticator.""" - verifier = _rand() - - auth = cls(auth_code_verifier=verifier, client_session=client_session) - - query = { - "client_id": CLIENT_ID, - # "interacted": 1, - "redirect_uri": REDIRECT_URI, - "response_type": "session_token_code", - "scope": "+".join(SCOPES), - "session_token_code_challenge": _hash(verifier), - "session_token_code_challenge_method": "S256", - "state": _rand(), - "theme": "login_form" - } - - auth.login_url = AUTHORIZE_URL.format(urlencode(query)).replace("%2B", "+") - return auth - - @classmethod - async def complete_login(cls, - auth: Authenticator | None, - response_token: str, - is_session_token: bool=False, - client_session: aiohttp.ClientSession | None = None) -> Authenticator: - """Creates and logs into Nintendo APIs""" - if is_session_token: - auth = cls(session_token=response_token, client_session=client_session) - await auth.perform_refresh() - else: - response_token = _parse_response_token(response_token) - await auth.perform_login( - session_token_code=response_token.get("session_token_code") - ) - await auth.perform_refresh() - - return auth diff --git a/pynintendoparental/authenticator/const.py b/pynintendoparental/authenticator/const.py deleted file mode 100644 index 082d90d..0000000 --- a/pynintendoparental/authenticator/const.py +++ /dev/null @@ -1,29 +0,0 @@ -# pylint: disable=line-too-long -"""Static values.""" - -CLIENT_ID = "54789befb391a838" -GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer-session-token" - -REDIRECT_URI = f"npf{CLIENT_ID}://auth" -SCOPES = [ - "openid", - "user", - "user.mii", - "moonUser:administration", - "moonDevice:create", - "moonOwnedDevice:administration", - "moonParentalControlSetting", - "moonParentalControlSetting:update", - "moonParentalControlSettingState", - "moonPairingState", - "moonSmartDevice:administration", - "moonDailySummary", - "moonMonthlySummary", -] - -AUTHORIZE_URL = "https://accounts.nintendo.com/connect/1.0.0/authorize?{}" -SESSION_TOKEN_URL = "https://accounts.nintendo.com/connect/1.0.0/api/session_token" -TOKEN_URL = "https://accounts.nintendo.com/connect/1.0.0/api/token" - -ACCOUNT_API_BASE = "https://api.accounts.nintendo.com/2.0.0" -MY_ACCOUNT_ENDPOINT = f"{ACCOUNT_API_BASE}/users/me" diff --git a/pynintendoparental/const.py b/pynintendoparental/const.py index 7b936a5..3fb3168 100644 --- a/pynintendoparental/const.py +++ b/pynintendoparental/const.py @@ -4,6 +4,7 @@ import logging _LOGGER = logging.getLogger(__package__) +CLIENT_ID = "54789befb391a838" MOBILE_APP_PKG = "com.nintendo.znma" MOBILE_APP_VERSION = "2.2.0" MOBILE_APP_BUILD = "560" @@ -14,47 +15,55 @@ BASE_URL = "https://app.lp1.znma.srv.nintendo.net/v2" USER_AGENT = f"moon_ANDROID/{MOBILE_APP_VERSION} ({MOBILE_APP_PKG}; build:{MOBILE_APP_BUILD}; {OS_STR})" -DAYS_OF_WEEK = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] +DAYS_OF_WEEK = [ + "monday", + "tuesday", + "wednesday", + "thursday", + "friday", + "saturday", + "sunday", +] ENDPOINTS = { "get_account_devices": { "url": "{BASE_URL}/actions/user/fetchOwnedDevices", - "method": "GET" + "method": "GET", }, "get_account_device": { "url": "{BASE_URL}/actions/user/fetchOwnedDevice?deviceId={DEVICE_ID}", - "method": "GET" + "method": "GET", }, "get_device_daily_summaries": { "url": "{BASE_URL}/actions/playSummary/fetchDailySummaries?deviceId={DEVICE_ID}", - "method": "GET" + "method": "GET", }, "get_device_monthly_summaries": { "url": "{BASE_URL}/actions/playSummary/fetchLatestMonthlySummary?deviceId={DEVICE_ID}", - "method": "GET" + "method": "GET", }, "get_device_parental_control_setting": { "url": "{BASE_URL}/actions/parentalControlSetting/fetchParentalControlSetting?deviceId={DEVICE_ID}", - "method": "GET" + "method": "GET", }, "update_restriction_level": { "url": "{BASE_URL}/actions/parentalControlSetting/updateRestrictionLevel", - "method": "POST" + "method": "POST", }, "update_play_timer": { "url": "{BASE_URL}/actions/parentalControlSetting/updatePlayTimer", - "method": "POST" + "method": "POST", }, "update_unlock_code": { "url": "{BASE_URL}/actions/parentalControlSetting/updateUnlockCode", - "method": "POST" + "method": "POST", }, "get_device_monthly_summary": { "url": "{BASE_URL}/actions/playSummary/fetchMonthlySummary?deviceId={DEVICE_ID}&year={YEAR}&month={MONTH}&containLatest=false", - "method": "GET" + "method": "GET", }, "update_extra_playing_time": { "url": "{BASE_URL}/actions/device/updateExtraPlayingTime", - "method": "POST" - } + "method": "POST", + }, } diff --git a/pynintendoparental/device.py b/pynintendoparental/device.py index 068aec2..ea16e92 100644 --- a/pynintendoparental/device.py +++ b/pynintendoparental/device.py @@ -6,14 +6,20 @@ from datetime import datetime, timedelta, time from typing import Callable +from pynintendoauth.exceptions import HttpException + from .api import Api from .const import _LOGGER, DAYS_OF_WEEK -from .exceptions import BedtimeOutOfRangeError, DailyPlaytimeOutOfRangeError, HttpException +from .exceptions import ( + BedtimeOutOfRangeError, + DailyPlaytimeOutOfRangeError, +) from .enum import AlarmSettingState, DeviceTimerMode, RestrictionMode from .player import Player from .utils import is_awaitable from .application import Application + class Device: """A device""" @@ -52,10 +58,7 @@ def __init__(self, api): @property def model(self) -> str: """Return the model.""" - model_map = { - "P00": "Switch", - "P01": "Switch 2" - } + model_map = {"P00": "Switch", "P01": "Switch 2"} return model_map.get(self.generation, "Unknown") @property @@ -68,10 +71,10 @@ async def update(self): _LOGGER.debug(">> Device.update()") now = datetime.now() await asyncio.gather( - self._get_daily_summaries(now), - self._get_parental_control_setting(now), - self.get_monthly_summary(), - self._get_extras() + self._get_daily_summaries(now), + self._get_parental_control_setting(now), + self.get_monthly_summary(), + self._get_extras(), ) if not self.players: self.players = Player.from_device_daily_summary(self.daily_summaries) @@ -114,9 +117,7 @@ async def set_new_pin(self, pin: str): """Updates the pin for the device.""" _LOGGER.debug(">> Device.set_new_pin(pin=REDACTED)") await self._send_api_update( - self._api.async_update_unlock_code, - new_code=pin, - device_id=self.device_id + self._api.async_update_unlock_code, new_code=pin, device_id=self.device_id ) async def add_extra_time(self, minutes: int): @@ -129,25 +130,30 @@ async def add_extra_time(self, minutes: int): async def set_restriction_mode(self, mode: RestrictionMode): """Updates the restriction mode of the device.""" _LOGGER.debug(">> Device.set_restriction_mode(mode=%s)", mode) - self.parental_control_settings["playTimerRegulations"]["restrictionMode"] = str(mode) + self.parental_control_settings["playTimerRegulations"]["restrictionMode"] = str( + mode + ) response = await self._api.async_update_play_timer( settings={ "deviceId": self.device_id, - "playTimerRegulations": self.parental_control_settings["playTimerRegulations"] + "playTimerRegulations": self.parental_control_settings[ + "playTimerRegulations" + ], } ) now = datetime.now() - self._parse_parental_control_setting(response["json"], now) # Don't need to recalculate times + self._parse_parental_control_setting( + response["json"], now + ) # Don't need to recalculate times await self._execute_callbacks() async def set_bedtime_alarm(self, value: time): """Update the bedtime alarm for the device.""" - _LOGGER.debug(">> Device.set_bedtime_alarm(value=%s)", - value) + _LOGGER.debug(">> Device.set_bedtime_alarm(value=%s)", value) if not ( - (16 <= value.hour <= 22) or - (value.hour == 23 and value.minute == 0) or - (value.hour == 0 and value.minute == 0) + (16 <= value.hour <= 22) + or (value.hour == 23 and value.minute == 0) + or (value.hour == 0 and value.minute == 0) ): raise BedtimeOutOfRangeError(value=value) now = datetime.now() @@ -157,24 +163,25 @@ async def set_bedtime_alarm(self, value: time): if bedtime["enabled"]: bedtime = { **bedtime, - "endingTime": { - "hour": value.hour, - "minute": value.minute - } + "endingTime": {"hour": value.hour, "minute": value.minute}, } if self.timer_mode == DeviceTimerMode.DAILY: - self.parental_control_settings["playTimerRegulations"]["dailyRegulations"]["bedtime"] = bedtime + self.parental_control_settings["playTimerRegulations"]["dailyRegulations"][ + "bedtime" + ] = bedtime else: - self.parental_control_settings["playTimerRegulations"]["eachDayOfTheWeekRegulations"][ - DAYS_OF_WEEK[now.weekday()] - ]["bedtime"] = bedtime + self.parental_control_settings["playTimerRegulations"][ + "eachDayOfTheWeekRegulations" + ][DAYS_OF_WEEK[now.weekday()]]["bedtime"] = bedtime await self._send_api_update( self._api.async_update_play_timer, settings={ "deviceId": self.device_id, - "playTimerRegulations": self.parental_control_settings["playTimerRegulations"] + "playTimerRegulations": self.parental_control_settings[ + "playTimerRegulations" + ], }, - now=now + now=now, ) async def set_timer_mode(self, mode: DeviceTimerMode): @@ -186,14 +193,15 @@ async def set_timer_mode(self, mode: DeviceTimerMode): self._api.async_update_play_timer, settings={ "deviceId": self.device_id, - "playTimerRegulations": self.parental_control_settings["playTimerRegulations"] - } + "playTimerRegulations": self.parental_control_settings[ + "playTimerRegulations" + ], + }, ) async def update_max_daily_playtime(self, minutes: int | float = 0): """Updates the maximum daily playtime of a device.""" - _LOGGER.debug(">> Device.update_max_daily_playtime(minutes=%s)", - minutes) + _LOGGER.debug(">> Device.update_max_daily_playtime(minutes=%s)", minutes) if isinstance(minutes, float): minutes = int(minutes) if minutes > 360 or minutes < -1: @@ -207,39 +215,63 @@ async def update_max_daily_playtime(self, minutes: int | float = 0): _LOGGER.debug( "Setting timeToPlayInOneDay.limitTime for device %s to value %s", self.device_id, - minutes) - self.parental_control_settings["playTimerRegulations"]["dailyRegulations"]["timeToPlayInOneDay"]["enabled"] = ttpiod - if "limitTime" in self.parental_control_settings["playTimerRegulations"]["dailyRegulations"]["timeToPlayInOneDay"] and minutes is None: - self.parental_control_settings["playTimerRegulations"]["dailyRegulations"]["timeToPlayInOneDay"].pop("limitTime") + minutes, + ) + self.parental_control_settings["playTimerRegulations"]["dailyRegulations"][ + "timeToPlayInOneDay" + ]["enabled"] = ttpiod + if ( + "limitTime" + in self.parental_control_settings["playTimerRegulations"][ + "dailyRegulations" + ]["timeToPlayInOneDay"] + and minutes is None + ): + self.parental_control_settings["playTimerRegulations"][ + "dailyRegulations" + ]["timeToPlayInOneDay"].pop("limitTime") else: - self.parental_control_settings["playTimerRegulations"]["dailyRegulations"]["timeToPlayInOneDay"]["limitTime"] = minutes + self.parental_control_settings["playTimerRegulations"][ + "dailyRegulations" + ]["timeToPlayInOneDay"]["limitTime"] = minutes else: _LOGGER.debug( "Setting timeToPlayInOneDay.limitTime for device %s to value %s", self.device_id, - minutes + minutes, ) - day_of_week_regs = self.parental_control_settings["playTimerRegulations"]["eachDayOfTheWeekRegulations"] + day_of_week_regs = self.parental_control_settings["playTimerRegulations"][ + "eachDayOfTheWeekRegulations" + ] current_day = DAYS_OF_WEEK[now.weekday()] day_of_week_regs[current_day]["timeToPlayInOneDay"]["enabled"] = ttpiod - if "limitTime" in day_of_week_regs[current_day]["timeToPlayInOneDay"] and minutes is None: + if ( + "limitTime" in day_of_week_regs[current_day]["timeToPlayInOneDay"] + and minutes is None + ): day_of_week_regs[current_day]["timeToPlayInOneDay"].pop("limitTime") else: - day_of_week_regs[current_day]["timeToPlayInOneDay"]["limitTime"] = minutes + day_of_week_regs[current_day]["timeToPlayInOneDay"]["limitTime"] = ( + minutes + ) await self._send_api_update( self._api.async_update_play_timer, settings={ "deviceId": self.device_id, - "playTimerRegulations": self.parental_control_settings["playTimerRegulations"] + "playTimerRegulations": self.parental_control_settings[ + "playTimerRegulations" + ], }, - now=now + now=now, ) def _update_applications(self): """Updates applications from daily summary.""" _LOGGER.debug(">> Device._update_applications()") - parsed_apps = Application.from_whitelist(self.parental_control_settings.get("whitelistedApplications", [])) + parsed_apps = Application.from_whitelist( + self.parental_control_settings.get("whitelistedApplications", []) + ) for app in parsed_apps: try: self.get_application(app.application_id).update(app) @@ -250,9 +282,13 @@ def _update_applications(self): def _get_today_regulation(self, now: datetime) -> dict: """Returns the regulation settings for the current day.""" if self.timer_mode == DeviceTimerMode.EACH_DAY_OF_THE_WEEK: - day_of_week_regs = self.parental_control_settings["playTimerRegulations"].get("eachDayOfTheWeekRegulations", {}) + day_of_week_regs = self.parental_control_settings[ + "playTimerRegulations" + ].get("eachDayOfTheWeekRegulations", {}) return day_of_week_regs.get(DAYS_OF_WEEK[now.weekday()], {}) - return self.parental_control_settings.get("playTimerRegulations", {}).get("dailyRegulations", {}) + return self.parental_control_settings.get("playTimerRegulations", {}).get( + "dailyRegulations", {} + ) def _parse_parental_control_setting(self, pcs: dict, now: datetime): """Parse a parental control setting request response.""" @@ -260,13 +296,23 @@ def _parse_parental_control_setting(self, pcs: dict, now: datetime): self.parental_control_settings = pcs["parentalControlSetting"] # Clean up bedtimeStartingTime if it's empty - if "bedtimeStartingTime" in self.parental_control_settings["playTimerRegulations"]: - if self.parental_control_settings["playTimerRegulations"].get("bedtimeStartingTime", {}).get("hour", 0) == 0: - self.parental_control_settings["playTimerRegulations"].pop("bedtimeStartingTime") + if ( + "bedtimeStartingTime" + in self.parental_control_settings["playTimerRegulations"] + ): + if ( + self.parental_control_settings["playTimerRegulations"] + .get("bedtimeStartingTime", {}) + .get("hour", 0) + == 0 + ): + self.parental_control_settings["playTimerRegulations"].pop( + "bedtimeStartingTime" + ) - self.forced_termination_mode = ( - self.parental_control_settings["playTimerRegulations"]["restrictionMode"] == str(RestrictionMode.FORCED_TERMINATION) - ) + self.forced_termination_mode = self.parental_control_settings[ + "playTimerRegulations" + ]["restrictionMode"] == str(RestrictionMode.FORCED_TERMINATION) # Update limit and bedtime from regulations self.timer_mode = DeviceTimerMode( @@ -280,7 +326,7 @@ def _parse_parental_control_setting(self, pcs: dict, now: datetime): if bedtime_setting.get("enabled"): self.bedtime_alarm = time( hour=bedtime_setting["endingTime"]["hour"], - minute=bedtime_setting["endingTime"]["minute"] + minute=bedtime_setting["endingTime"]["minute"], ) else: self.bedtime_alarm = time(hour=0, minute=0) @@ -303,8 +349,10 @@ def _calculate_times(self, now: datetime): self.today_playing_time = self.daily_summaries[0].get("playingTime") or 0 self.today_disabled_time = self.daily_summaries[0].get("disabledTime") or 0 self.today_exceeded_time = self.daily_summaries[0].get("exceededTime") or 0 - _LOGGER.debug("Cached playing, disabled and exceeded time for today for device %s", - self.device_id) + _LOGGER.debug( + "Cached playing, disabled and exceeded time for today for device %s", + self.device_id, + ) self._calculate_today_remaining_time(now) month_playing_time: int = 0 @@ -319,56 +367,79 @@ def _calculate_times(self, now: datetime): for app in parsed_apps: try: int_app = self.get_application(app.application_id) - _LOGGER.debug("Updating cached app state %s for device %s", - int_app.application_id, - self.device_id) + _LOGGER.debug( + "Updating cached app state %s for device %s", + int_app.application_id, + self.device_id, + ) int_app.update(app) except ValueError: - _LOGGER.debug("Creating new cached application entry %s for device %s", - app.application_id, - self.device_id) + _LOGGER.debug( + "Creating new cached application entry %s for device %s", + app.application_id, + self.device_id, + ) self.applications.append(app) # update application playtime try: for player in self.get_date_summary()[0].get("devicePlayers", []): for app in player.get("playedApps", []): - self.get_application(app["applicationId"]).update_today_time_played(app) + self.get_application(app["applicationId"]).update_today_time_played( + app + ) self.application_update_failed = False except ValueError as err: - _LOGGER.debug("Unable to retrieve applications for device %s: %s", self.name, err) + _LOGGER.debug( + "Unable to retrieve applications for device %s: %s", self.name, err + ) self.application_update_failed = True def _calculate_today_remaining_time(self, now: datetime): """Calculates the remaining playing time for today.""" - self.stats_update_failed = True # Assume failure until success + self.stats_update_failed = True # Assume failure until success try: - minutes_in_day = 1440 # 24 * 60 + minutes_in_day = 1440 # 24 * 60 current_minutes_past_midnight = now.hour * 60 + now.minute if self.limit_time in (-1, None): # No play limit, so remaining time is until end of day. - time_remaining_by_play_limit = minutes_in_day - current_minutes_past_midnight + time_remaining_by_play_limit = ( + minutes_in_day - current_minutes_past_midnight + ) else: time_remaining_by_play_limit = self.limit_time - self.today_playing_time # 2. Calculate remaining time until bedtime - if self.bedtime_alarm and self.bedtime_alarm != time(hour=0, minute=0) and self.alarms_enabled: + if ( + self.bedtime_alarm + and self.bedtime_alarm != time(hour=0, minute=0) + and self.alarms_enabled + ): bedtime_dt = datetime.combine(now.date(), self.bedtime_alarm) - if bedtime_dt > now: # Bedtime is in the future today + if bedtime_dt > now: # Bedtime is in the future today time_remaining_by_bedtime = (bedtime_dt - now).total_seconds() / 60 - else: # Bedtime has passed + else: # Bedtime has passed time_remaining_by_bedtime = 0.0 else: - time_remaining_by_bedtime = minutes_in_day - current_minutes_past_midnight + time_remaining_by_bedtime = ( + minutes_in_day - current_minutes_past_midnight + ) # Effective remaining time is the minimum of the two constraints - effective_remaining_time = min(time_remaining_by_play_limit, time_remaining_by_bedtime) + effective_remaining_time = min( + time_remaining_by_play_limit, time_remaining_by_bedtime + ) self.today_time_remaining = int(max(0.0, effective_remaining_time)) - _LOGGER.debug("Calculated today's remaining time: %s minutes", self.today_time_remaining) + _LOGGER.debug( + "Calculated today's remaining time: %s minutes", + self.today_time_remaining, + ) self.stats_update_failed = False except (ValueError, TypeError, AttributeError) as err: - _LOGGER.warning("Unable to calculate remaining time for device %s: %s", self.name, err) + _LOGGER.warning( + "Unable to calculate remaining time for device %s: %s", self.name, err + ) async def _get_parental_control_setting(self, now: datetime): """Retreives parental control settings from the API.""" @@ -383,7 +454,7 @@ async def _get_daily_summaries(self, now: datetime): """Retrieve daily summaries.""" _LOGGER.debug(">> Device._get_daily_summaries()") response = await self._api.async_get_device_daily_summaries( - device_id = self.device_id + device_id=self.device_id ) self.daily_summaries = response["json"]["dailySummaries"] _LOGGER.debug("New daily summary %s", self.daily_summaries) @@ -395,14 +466,16 @@ async def _get_extras(self): if self.alarms_enabled is not None: # first refresh can come from self.extra without http request response = await self._api.async_get_account_device( - device_id = self.device_id + device_id=self.device_id ) self.extra = response["json"]["ownedDevice"]["device"] status = self.extra["alarmSetting"]["visibility"] self.alarms_enabled = status == str(AlarmSettingState.VISIBLE) - _LOGGER.debug("Cached alarms enabled to state %s for device %s", - self.alarms_enabled, - self.device_id) + _LOGGER.debug( + "Cached alarms enabled to state %s for device %s", + self.alarms_enabled, + self.device_id, + ) async def get_monthly_summary(self, search_date: datetime = None) -> dict | None: """Gets the monthly summary.""" @@ -420,56 +493,71 @@ async def get_monthly_summary(self, search_date: datetime = None) -> dict | None available_summaries = response["json"]["available"] _LOGGER.debug("Available monthly summaries: %s", available_summaries) if not available_summaries: - _LOGGER.debug("No monthly summaries available for device %s", self.device_id) + _LOGGER.debug( + "No monthly summaries available for device %s", self.device_id + ) return None # Use the most recent available summary available_summary = available_summaries[0] - search_date = datetime.strptime(f"{available_summary['year']}-{available_summary['month']}-01", "%Y-%m-%d") - _LOGGER.debug("Using search date %s for monthly summary request", search_date) + search_date = datetime.strptime( + f"{available_summary['year']}-{available_summary['month']}-01", + "%Y-%m-%d", + ) + _LOGGER.debug( + "Using search date %s for monthly summary request", search_date + ) latest = True try: response = await self._api.async_get_device_monthly_summary( - device_id=self.device_id, - year=search_date.year, - month=search_date.month + device_id=self.device_id, year=search_date.year, month=search_date.month ) except HttpException as exc: - _LOGGER.warning("HTTP Exception raised while getting monthly summary for device %s: %s", - self.device_id, - exc) + _LOGGER.warning( + "HTTP Exception raised while getting monthly summary for device %s: %s", + self.device_id, + exc, + ) return None else: - _LOGGER.debug("Monthly summary query complete for device %s: %s", - self.device_id, - response["json"]["summary"]) + _LOGGER.debug( + "Monthly summary query complete for device %s: %s", + self.device_id, + response["json"]["summary"], + ) if latest: self.last_month_summary = summary = response["json"]["summary"] return summary return response["json"]["summary"] - def get_date_summary(self, input_date: datetime = datetime.now()) -> dict: """Returns usage for a given date.""" if not self.daily_summaries: raise ValueError("No daily summaries available to search.") summary = [ - x for x in self.daily_summaries - if x["date"] == input_date.strftime('%Y-%m-%d') + x + for x in self.daily_summaries + if x["date"] == input_date.strftime("%Y-%m-%d") ] if len(summary) == 0: input_date -= timedelta(days=1) summary = [ - x for x in self.daily_summaries - if x["date"] == input_date.strftime('%Y-%m-%d') - ] + x + for x in self.daily_summaries + if x["date"] == input_date.strftime("%Y-%m-%d") + ] if len(summary) == 0: - raise ValueError(f"A summary for the given date {input_date} does not exist") + raise ValueError( + f"A summary for the given date {input_date} does not exist" + ) return summary def get_application(self, application_id: str) -> Application: """Returns a single application.""" - app = next((app for app in self.applications if app.application_id == application_id), None) + app = next( + (app for app in self.applications if app.application_id == application_id), + None, + ) if app: return app raise ValueError(f"Application with id {application_id} not found.") @@ -482,7 +570,7 @@ def get_player(self, player_id: str) -> Player: raise ValueError(f"Player with id {player_id} not found.") @classmethod - async def from_devices_response(cls, raw: dict, api) -> list['Device']: + async def from_devices_response(cls, raw: dict, api) -> list["Device"]: """Parses a device request response body.""" _LOGGER.debug("Parsing device list response") if "ownedDevices" not in raw.keys(): @@ -500,7 +588,7 @@ async def from_devices_response(cls, raw: dict, api) -> list['Device']: return devices @classmethod - def from_device_response(cls, raw: dict, api) -> 'Device': + def from_device_response(cls, raw: dict, api) -> "Device": """Parses a single device request response body.""" _LOGGER.debug("Parsing device response") if "deviceId" not in raw.keys(): diff --git a/pynintendoparental/enum.py b/pynintendoparental/enum.py index 97da549..aff6848 100644 --- a/pynintendoparental/enum.py +++ b/pynintendoparental/enum.py @@ -2,8 +2,10 @@ from enum import Enum, StrEnum + class AlarmSettingState(Enum): """Alarm setting states.""" + SUCCESS = 0 TO_VISIBLE = 1 TO_INVISIBLE = 2 @@ -13,16 +15,20 @@ class AlarmSettingState(Enum): def __str__(self) -> str: return self.name + class RestrictionMode(Enum): """Restriction modes.""" + FORCED_TERMINATION = 0 ALARM = 1 def __str__(self) -> str: return self.name + class DeviceTimerMode(StrEnum): """Device timer modes.""" + DAILY = "DAILY" EACH_DAY_OF_THE_WEEK = "EACH_DAY_OF_THE_WEEK" diff --git a/pynintendoparental/exceptions.py b/pynintendoparental/exceptions.py index 29ae68b..7daffd8 100644 --- a/pynintendoparental/exceptions.py +++ b/pynintendoparental/exceptions.py @@ -8,31 +8,13 @@ class RangeErrorKeys(StrEnum): DAILY_PLAYTIME = "daily_playtime_out_of_range" BEDTIME = "bedtime_alarm_out_of_range" -class HttpException(Exception): - """A HTTP error occured""" - def __init__(self, status_code: int, message: str, error_code: str | None = None) -> None: - """Initialize the exception.""" - super().__init__(message) - self.status_code = status_code - self.message = message - self.error_code = error_code - - def __str__(self) -> str: - if self.error_code: - return f"HTTP {self.status_code}: {self.message} ({self.error_code})" - return f"HTTP {self.status_code}: {self.message}" - -class InvalidSessionTokenException(HttpException): - """Provided session token was invalid (invalid_grant).""" - -class InvalidOAuthConfigurationException(HttpException): - """The OAuth scopes are invalid.""" - class NoDevicesFoundException(Exception): """No devices were found for the account.""" + class InputValidationError(Exception): """Input Validation Failed.""" + value: object error_key: str @@ -40,11 +22,13 @@ def __init__(self, value: object) -> None: super().__init__(f"{self.__doc__} Received value: {value}") self.value = value + class BedtimeOutOfRangeError(InputValidationError): """Bedtime is outside of the allowed range.""" error_key = RangeErrorKeys.BEDTIME + class DailyPlaytimeOutOfRangeError(InputValidationError): """Daily playtime is outside of the allowed range.""" diff --git a/pynintendoparental/player.py b/pynintendoparental/player.py index 58d3ba4..af115e2 100644 --- a/pynintendoparental/player.py +++ b/pynintendoparental/player.py @@ -2,8 +2,10 @@ from .const import _LOGGER + class Player: """Defines a single player on a Nintendo device.""" + def __init__(self): """Init a player.""" self.player_image: str = None @@ -25,7 +27,7 @@ def update_from_daily_summary(self, raw: list[dict]): break @classmethod - def from_device_daily_summary(cls, raw: list[dict]) -> list['Player']: + def from_device_daily_summary(cls, raw: list[dict]) -> list["Player"]: """Converts a daily summary response into a list of players.""" players = [] _LOGGER.debug("Building players from device daily summary.") diff --git a/pynintendoparental/utils.py b/pynintendoparental/utils.py index 9dec6d2..cce6439 100644 --- a/pynintendoparental/utils.py +++ b/pynintendoparental/utils.py @@ -2,6 +2,7 @@ import inspect + def is_awaitable(func): """Check if a function is awaitable or not.""" return inspect.iscoroutinefunction(func) or inspect.isasyncgenfunction(func) diff --git a/requirements.txt b/requirements.txt index 276cbb3..ea7c3d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ aiohttp -python-dotenv \ No newline at end of file +python-dotenv +pynintendoauth \ No newline at end of file diff --git a/setup.py b/setup.py index e52c33b..248e4ea 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ REQUIREMENTS = [ # Add your list of production dependencies here, eg: # 'requests == 2.*', + 'pynintendoauth~=1.0.0' ] DEV_REQUIREMENTS = [ diff --git a/test.py b/test.py index 82f900c..15ffc31 100644 --- a/test.py +++ b/test.py @@ -1,33 +1,37 @@ - import os import logging import asyncio -from datetime import time - from dotenv import load_dotenv +from pynintendoauth.exceptions import InvalidSessionTokenException from pynintendoparental import Authenticator, NintendoParental -from pynintendoparental.exceptions import InvalidSessionTokenException from pynintendoparental.enum import DeviceTimerMode load_dotenv() _LOGGER = logging.getLogger(__name__) + async def main(): """Running function""" login = True while login: try: - if bool(int(os.environ.get("USE_SESSION_TOKEN", 0))) or input("Should we use a session token? [N/y] ").upper() == "Y": - auth = await Authenticator.complete_login(None, os.environ.get("SESSION_TOKEN") or input("Token: "), True) + if ( + bool(int(os.environ.get("USE_SESSION_TOKEN", 0))) + or input("Should we use a session token? [N/y] ").upper() == "Y" + ): + auth = Authenticator( + session_token=os.environ.get("SESSION_TOKEN") or input("Token: ") + ) + await auth.async_complete_login(use_session_token=True) else: - auth = Authenticator.generate_login() + auth = Authenticator() _LOGGER.info("Login using %s", auth.login_url) - auth = await Authenticator.complete_login(auth, input("Response URL: "), False) + await auth.async_complete_login(input("Response URL: ")) _LOGGER.info("Logged in, ready.") _LOGGER.debug("Access token is: %s", auth.access_token) - _LOGGER.debug("Session token is: %s", auth.get_session_token) + _LOGGER.debug("Session token is: %s", auth.session_token) control = await NintendoParental.create(auth) login = False except InvalidSessionTokenException as err: @@ -37,7 +41,9 @@ async def main(): while True: for device in control.devices.values(): - _LOGGER.debug("Discovered device %s, label %s", device.device_id, device.name) + _LOGGER.debug( + "Discovered device %s, label %s", device.device_id, device.name + ) _LOGGER.debug("Usage today %s", device.today_playing_time) _LOGGER.debug("Usage remaining %s", device.today_time_remaining) await device.set_timer_mode(DeviceTimerMode.EACH_DAY_OF_THE_WEEK) @@ -47,6 +53,7 @@ async def main(): _LOGGER.debug("pong") await control.update() + if __name__ == "__main__": logging.basicConfig( level=logging.DEBUG,