Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions homeassistant/auth/providers/homeassistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import hmac
from typing import Any, Dict, List, Optional, cast

import bcrypt
import voluptuous as vol

from homeassistant.const import CONF_ID
from homeassistant.core import callback, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.async_ import run_coroutine_threadsafe

from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow

from ..models import Credentials, UserMeta
from ..util import generate_secret

Expand Down Expand Up @@ -74,8 +77,7 @@ def validate_login(self, username: str, password: str) -> None:

Raises InvalidAuth if auth invalid.
"""
hashed = self.hash_password(password)

dummy = b'$2b$12$CiuFGszHx9eNHxPuQcwBWez4CwDTOcLTX5CbOpV6gef2nYuXkY7BO'
found = None

# Compare all users to avoid timing attacks.
Expand All @@ -84,22 +86,55 @@ def validate_login(self, username: str, password: str) -> None:
found = user

if found is None:
# Do one more compare to make timing the same as if user was found.
hmac.compare_digest(hashed, hashed)
# check a hash to make timing the same as if user was found
bcrypt.checkpw(b'foo',
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need a migration path. We can only do the migration while a user is logging in. Verify with old pattern -> if ok, migrate to new salt.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

dummy)
raise InvalidAuth

if not hmac.compare_digest(hashed,
base64.b64decode(found['password'])):
user_hash = base64.b64decode(found['password'])

# if the hash is not a bcrypt hash...
# provide a transparant upgrade for old pbkdf2 hash format
if not (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$')):
# IMPORTANT! validate the login, bail if invalid
hashed = self.legacy_hash_password(password)
if not hmac.compare_digest(hashed, user_hash):
raise InvalidAuth
# then re-hash the valid password with bcrypt
self.change_password(found['username'], password)
run_coroutine_threadsafe(
self.async_save(), self.hass.loop
).result()
user_hash = base64.b64decode(found['password'])

# bcrypt.checkpw is timing-safe
if not bcrypt.checkpw(password.encode(),
user_hash):
raise InvalidAuth

def hash_password(self, password: str, for_storage: bool = False) -> bytes:
"""Encode a password."""
def legacy_hash_password(self, password: str,
for_storage: bool = False) -> bytes:
"""LEGACY password encoding."""
# We're no longer storing salts in data, but if one exists we
# should be able to retrieve it.
salt = self._data['salt'].encode() # type: ignore
hashed = hashlib.pbkdf2_hmac('sha512', password.encode(), salt, 100000)
if for_storage:
hashed = base64.b64encode(hashed)
return hashed

# pylint: disable=no-self-use
def hash_password(self, password: str, for_storage: bool = False) -> bytes:
"""Encode a password."""
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12)) \
# type: bytes
if for_storage:
hashed = base64.b64encode(hashed)
return hashed

def add_auth(self, username: str, password: str) -> None:
"""Add a new authenticated user/pass."""
if any(user['username'] == username for user in self.users):
Expand Down
1 change: 1 addition & 0 deletions homeassistant/package_constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ aiohttp==3.4.0
astral==1.6.1
async_timeout==3.0.0
attrs==18.1.0
bcrypt==3.1.4
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one downside of using bcrypt is that it requires an external dependency. They are providing wheels for Windows, Mac and Linux, so that's good:

image

For Python 3.7 there are currently only wheels for Windows. Windows users have the most difficulty building stuff, so that's good.

Because it's part of the PYCA org, I think I would trust it to remain maintained.

certifi>=2018.04.16
jinja2>=2.10
PyJWT==1.6.4
Expand Down
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ aiohttp==3.4.0
astral==1.6.1
async_timeout==3.0.0
attrs==18.1.0
bcrypt==3.1.4
certifi>=2018.04.16
jinja2>=2.10
PyJWT==1.6.4
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
'astral==1.6.1',
'async_timeout==3.0.0',
'attrs==18.1.0',
'bcrypt==3.1.4',
'certifi>=2018.04.16',
'jinja2>=2.10',
'PyJWT==1.6.4',
Expand Down
89 changes: 89 additions & 0 deletions tests/auth/providers/test_homeassistant.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test the Home Assistant local auth provider."""
from unittest.mock import Mock

import base64
import pytest

from homeassistant import data_entry_flow
Expand Down Expand Up @@ -132,3 +133,91 @@ async def test_new_users_populate_values(hass, data):
user = await manager.async_get_or_create_user(credentials)
assert user.name == 'hello'
assert user.is_active


async def test_new_hashes_are_bcrypt(data, hass):
"""Test that newly created hashes are using bcrypt."""
data.add_auth('newuser', 'newpass')
found = None
for user in data.users:
if user['username'] == 'newuser':
found = user
assert found is not None
user_hash = base64.b64decode(found['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))


async def test_pbkdf2_to_bcrypt_hash_upgrade(hass_storage, hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()

# verify the correct (pbkdf2) password successfuly authenticates the user
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'beer')

# ...and that the hashes are now bcrypt hashes
user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))


async def test_pbkdf2_to_bcrypt_hash_upgrade_with_incorrect_pass(hass_storage,
hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()

orig_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])

# Make sure invalid legacy passwords fail
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')

# Make sure we don't change the password/hash when password is incorrect
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')

same_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])

assert orig_user_hash == same_user_hash