Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions homeassistant/auth/providers/homeassistant.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Home Assistant auth provider."""
import base64
from collections import OrderedDict
import hashlib
import hmac
from typing import Any, Dict, List, Optional, cast

import bcrypt
Expand All @@ -11,7 +9,6 @@
from homeassistant.const import CONF_ID
from homeassistant.core import callback, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.async_ import run_coroutine_threadsafe

from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow

Expand Down Expand Up @@ -94,39 +91,11 @@ def validate_login(self, username: str, password: str) -> None:

user_hash = base64.b64decode(found['password'])

# if the hash is not a bcrypt hash...
# provide a transparant upgrade for old pbkdf2 hash format
if not (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$')):
# IMPORTANT! validate the login, bail if invalid
hashed = self.legacy_hash_password(password)
if not hmac.compare_digest(hashed, user_hash):
raise InvalidAuth
# then re-hash the valid password with bcrypt
self.change_password(found['username'], password)
run_coroutine_threadsafe(
self.async_save(), self.hass.loop
).result()
user_hash = base64.b64decode(found['password'])

# bcrypt.checkpw is timing-safe
if not bcrypt.checkpw(password.encode(),
user_hash):
raise InvalidAuth

def legacy_hash_password(self, password: str,
for_storage: bool = False) -> bytes:
"""LEGACY password encoding."""
# We're no longer storing salts in data, but if one exists we
# should be able to retrieve it.
salt = self._data['salt'].encode() # type: ignore
hashed = hashlib.pbkdf2_hmac('sha512', password.encode(), salt, 100000)
if for_storage:
hashed = base64.b64encode(hashed)
return hashed

# pylint: disable=no-self-use
def hash_password(self, password: str, for_storage: bool = False) -> bytes:
"""Encode a password."""
Expand Down
89 changes: 0 additions & 89 deletions tests/auth/providers/test_homeassistant.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Test the Home Assistant local auth provider."""
from unittest.mock import Mock

import base64
import pytest
import voluptuous as vol

Expand Down Expand Up @@ -134,91 +133,3 @@ async def test_new_users_populate_values(hass, data):
user = await manager.async_get_or_create_user(credentials)
assert user.name == 'hello'
assert user.is_active


async def test_new_hashes_are_bcrypt(data, hass):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could leave this test in.

"""Test that newly created hashes are using bcrypt."""
data.add_auth('newuser', 'newpass')
found = None
for user in data.users:
if user['username'] == 'newuser':
found = user
assert found is not None
user_hash = base64.b64decode(found['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))


async def test_pbkdf2_to_bcrypt_hash_upgrade(hass_storage, hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()

# verify the correct (pbkdf2) password successfuly authenticates the user
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'beer')

# ...and that the hashes are now bcrypt hashes
user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))


async def test_pbkdf2_to_bcrypt_hash_upgrade_with_incorrect_pass(hass_storage,
hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()

orig_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])

# Make sure invalid legacy passwords fail
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')

# Make sure we don't change the password/hash when password is incorrect
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')

same_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])

assert orig_user_hash == same_user_hash