Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions homeassistant/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ async def async_disable_user_mfa(self, user: models.User,

await module.async_depose_user(user.id)

async def async_get_enabled_mfa(self, user: models.User) -> List[str]:
async def async_get_enabled_mfa(self, user: models.User) -> Dict[str, str]:
"""List enabled mfa modules for user."""
module_ids = []
modules = OrderedDict() # type: Dict[str, str]
for module_id, module in self._mfa_modules.items():
if await module.async_is_user_setup(user.id):
module_ids.append(module_id)
return module_ids
modules[module_id] = module.name
return modules

async def async_create_refresh_token(self, user: models.User,
client_id: Optional[str] = None) \
Expand Down
212 changes: 212 additions & 0 deletions homeassistant/auth/mfa_modules/totp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Time-based One Time Password auth module."""
import logging
from io import BytesIO
from typing import Any, Dict, Optional, Tuple # noqa: F401

import voluptuous as vol

from homeassistant.auth.models import User
from homeassistant.core import HomeAssistant

from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \
MULTI_FACTOR_AUTH_MODULE_SCHEMA, SetupFlow

REQUIREMENTS = ['pyotp==2.2.6', 'PyQRCode==1.2.1']

CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({
}, extra=vol.PREVENT_EXTRA)

STORAGE_VERSION = 1
STORAGE_KEY = 'auth_module.totp'
STORAGE_USERS = 'users'
STORAGE_USER_ID = 'user_id'
STORAGE_OTA_SECRET = 'ota_secret'

INPUT_FIELD_CODE = 'code'

DUMMY_SECRET = 'FPPTH34D4E3MI2HG'

_LOGGER = logging.getLogger(__name__)


def _generate_qr_code(data: str) -> str:
"""Generate a base64 PNG string represent QR Code image of data."""
import pyqrcode

qr_code = pyqrcode.create(data)

with BytesIO() as buffer:
qr_code.svg(file=buffer, scale=4)
return '{}'.format(
buffer.getvalue().decode("ascii").replace('\n', '')
.replace('<?xml version="1.0" encoding="UTF-8"?>'
'<svg xmlns="http://www.w3.org/2000/svg"', '<svg')
)


def _generate_secret_and_qr_code(username: str) -> Tuple[str, str, str]:
"""Generate a secret, url, and QR code."""
import pyotp

ota_secret = pyotp.random_base32()
url = pyotp.totp.TOTP(ota_secret).provisioning_uri(
username, issuer_name="Home Assistant")
image = _generate_qr_code(url)
return ota_secret, url, image


@MULTI_FACTOR_AUTH_MODULES.register('totp')
class TotpAuthModule(MultiFactorAuthModule):
"""Auth module validate time-based one time password."""

DEFAULT_TITLE = 'Time-based One Time Password'

def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._users = None # type: Optional[Dict[str, str]]
self._user_store = hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY)

@property
def input_schema(self) -> vol.Schema:
"""Validate login flow input data."""
return vol.Schema({INPUT_FIELD_CODE: str})

async def _async_load(self) -> None:
"""Load stored data."""
data = await self._user_store.async_load()

if data is None:
data = {STORAGE_USERS: {}}

self._users = data.get(STORAGE_USERS, {})

async def _async_save(self) -> None:
"""Save data."""
await self._user_store.async_save({STORAGE_USERS: self._users})

def _add_ota_secret(self, user_id: str,
secret: Optional[str] = None) -> str:
"""Create a ota_secret for user."""
import pyotp

ota_secret = secret or pyotp.random_base32() # type: str

self._users[user_id] = ota_secret # type: ignore
return ota_secret

async def async_setup_flow(self, user_id: str) -> SetupFlow:
"""Return a data entry flow handler for setup module.

Mfa module should extend SetupFlow
"""
user = await self.hass.auth.async_get_user(user_id) # type: ignore
return TotpSetupFlow(self, self.input_schema, user)

async def async_setup_user(self, user_id: str, setup_data: Any) -> str:
"""Set up auth module for user."""
if self._users is None:
await self._async_load()

result = await self.hass.async_add_executor_job(
self._add_ota_secret, user_id, setup_data.get('secret'))

await self._async_save()
return result

async def async_depose_user(self, user_id: str) -> None:
"""Depose auth module for user."""
if self._users is None:
await self._async_load()

if self._users.pop(user_id, None): # type: ignore
await self._async_save()

async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
if self._users is None:
await self._async_load()

return user_id in self._users # type: ignore

async def async_validation(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
if self._users is None:
await self._async_load()

# user_input has been validate in caller
return await self.hass.async_add_executor_job(
self._validate_2fa, user_id, user_input[INPUT_FIELD_CODE])

def _validate_2fa(self, user_id: str, code: str) -> bool:
"""Validate two factor authentication code."""
import pyotp

ota_secret = self._users.get(user_id) # type: ignore
if ota_secret is None:
# even we cannot find user, we still do verify
# to make timing the same as if user was found.
pyotp.TOTP(DUMMY_SECRET).verify(code)
return False

return bool(pyotp.TOTP(ota_secret).verify(code))


class TotpSetupFlow(SetupFlow):
"""Handler for the setup flow."""

def __init__(self, auth_module: TotpAuthModule,
setup_schema: vol.Schema,
user: User) -> None:
"""Initialize the setup flow."""
super().__init__(auth_module, setup_schema, user.id)
# to fix typing complaint
self._auth_module = auth_module # type: TotpAuthModule
self._user = user
self._ota_secret = None # type: Optional[str]
self._url = None # type Optional[str]
self._image = None # type Optional[str]

async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Handle the first step of setup flow.

Return self.async_show_form(step_id='init') if user_input == None.
Return self.async_create_entry(data={'result': result}) if finish.
"""
import pyotp

errors = {} # type: Dict[str, str]

if user_input:
verified = await self.hass.async_add_executor_job( # type: ignore
pyotp.TOTP(self._ota_secret).verify, user_input['code'])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't self._auth_module.async_validation be used here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is validate against a temp secret has not been saved yet.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if verified:
result = await self._auth_module.async_setup_user(
self._user_id, {'secret': self._ota_secret})
return self.async_create_entry(
title=self._auth_module.name,
data={'result': result}
)

errors['base'] = 'invalid_code'

else:
hass = self._auth_module.hass
self._ota_secret, self._url, self._image = \
await hass.async_add_executor_job( # type: ignore
_generate_secret_and_qr_code, str(self._user.name))

return self.async_show_form(
step_id='init',
data_schema=self._setup_schema,
description_placeholders={
'code': self._ota_secret,
'url': self._url,
'qr_code': self._image
},
errors=errors
)
4 changes: 2 additions & 2 deletions homeassistant/auth/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, auth_provider: AuthProvider) -> None:
self._auth_provider = auth_provider
self._auth_module_id = None # type: Optional[str]
self._auth_manager = auth_provider.hass.auth # type: ignore
self.available_mfa_modules = [] # type: List
self.available_mfa_modules = {} # type: Dict[str, str]
self.created_at = dt_util.utcnow()
self.user = None # type: Optional[User]

Expand Down Expand Up @@ -196,7 +196,7 @@ async def async_step_select_mfa_module(
errors['base'] = 'invalid_auth_module'

if len(self.available_mfa_modules) == 1:
self._auth_module_id = self.available_mfa_modules[0]
self._auth_module_id = list(self.available_mfa_modules.keys())[0]
return await self.async_step_mfa()

return self.async_show_form(
Expand Down
16 changes: 16 additions & 0 deletions homeassistant/components/auth/.translations/en.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"mfa_setup": {
"totp": {
"error": {
"invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock on Home Assistant system is accurate."
},
"step": {
"init": {
"description": "Scan the QR code with your authentication app, such as **Google Authenticator** or **Authy**. If you have problem to scan the QR code, using **`{code}`** to manual setup. \n\n{qr_code}\n\nEnter the six digi code appeared in your app below to verify the setup:",
"title": "Scan this QR code with your app"
}
},
"title": "TOTP"
}
}
}
16 changes: 16 additions & 0 deletions homeassistant/components/auth/strings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"mfa_setup":{
"totp": {
"title": "TOTP",
"step": {
"init": {
"title": "Set up two-factor authentication using TOTP",
"description": "To activate two factor authentication using time-based one-time passwords, scan the QR code with your authentication app. If you don't have one, we recommend either [Google Authenticator](https://support.google.com/accounts/answer/1066447) or [Authy](https://authy.com/).\n\n{qr_code}\n\nAfter scanning the code, enter the six digit code from your app to verify the setup. If you have problems scanning the QR code, do a manual setup with code **`{code}`**."
}
},
"error": {
"invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate."
}
}
}
}
6 changes: 5 additions & 1 deletion homeassistant/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,14 @@ async def async_process_ha_core_config(
if has_api_password:
auth_conf.append({'type': 'legacy_api_password'})

mfa_conf = config.get(CONF_AUTH_MFA_MODULES, [
{'type': 'totp', 'id': 'totp', 'name': 'Authenticator app'}
])

setattr(hass, 'auth', await auth.auth_manager_from_config(
hass,
auth_conf,
config.get(CONF_AUTH_MFA_MODULES, [])))
mfa_conf))

hac = hass.config

Expand Down
4 changes: 4 additions & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ PyMVGLive==1.1.4
# homeassistant.components.arduino
PyMata==2.14

# homeassistant.auth.mfa_modules.totp
PyQRCode==1.2.1

# homeassistant.components.sensor.rmvtransport
PyRMVtransport==0.0.7

Expand Down Expand Up @@ -985,6 +988,7 @@ pyopenuv==1.0.1
# homeassistant.components.iota
pyota==2.0.5

# homeassistant.auth.mfa_modules.totp
# homeassistant.components.sensor.otp
pyotp==2.2.6

Expand Down
4 changes: 4 additions & 0 deletions requirements_test_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ pymonoprice==0.3
# homeassistant.components.binary_sensor.nx584
pynx584==0.4

# homeassistant.auth.mfa_modules.totp
# homeassistant.components.sensor.otp
pyotp==2.2.6

# homeassistant.components.qwikswitch
pyqwikswitch==0.8

Expand Down
1 change: 1 addition & 0 deletions script/gen_requirements_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
'pylitejet',
'pymonoprice',
'pynx584',
'pyotp',
'pyqwikswitch',
'PyRMVtransport',
'python-forecastio',
Expand Down
Loading