From 82f59088d6ae166fca9d296419c28dc9aabf703e Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Tue, 2 Apr 2019 15:24:52 -0600 Subject: [PATCH 1/3] Create decorator to check service permissions --- homeassistant/auth/permissions/util.py | 53 +++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/homeassistant/auth/permissions/util.py b/homeassistant/auth/permissions/util.py index d2d259fb32ee7d..cbb38c25ba5ec0 100644 --- a/homeassistant/auth/permissions/util.py +++ b/homeassistant/auth/permissions/util.py @@ -1,16 +1,67 @@ """Helpers to deal with permissions.""" from functools import wraps -from typing import Callable, Dict, List, Optional, Union, cast # noqa: F401 +from typing import ( + TYPE_CHECKING, Callable, Dict, List, Optional, Union, cast) # noqa: F401 +from homeassistant.exceptions import Unauthorized, UnknownUser + +from .const import POLICY_CONTROL from .models import PermissionLookup from .types import CategoryType, SubCategoryDict, ValueType +if TYPE_CHECKING: + from homeassistant.core import HomeAssistant, Service, ServiceCall # noqa + LookupFunc = Callable[[PermissionLookup, SubCategoryDict, str], Optional[ValueType]] SubCatLookupType = Dict[str, LookupFunc] +def authorized_service_call(hass: 'HomeAssistant', domain: str) -> Callable: + """Ensure user of a config entry-enabled service call has permission.""" + def decorator(service: 'Service') -> Callable: + """Decorate.""" + @wraps(service) + async def check_permissions(call: 'ServiceCall') -> None: + """Check user permission and raise before call if unauthorized.""" + if not call.context.user_id: + return + + user = await hass.auth.async_get_user(call.context.user_id) + if user is None: + raise UnknownUser( + context=call.context, + permission=POLICY_CONTROL + ) + + # If the user passes one or more entity IDs, check permissions + # there; otherwise, check permissions against entities registered + # to the domain: + if call.data.get('entity_id'): + if isinstance(call.data['entity'], str): + entities = [call.data['entity_id']] + else: + entities = call.data['entity_id'] + else: + reg = await hass.helpers.entity_registry.async_get_registry() + entities = [ + entity.entity_id for entity in reg.entities.values() + if entity.platform == domain + ] + + for entity_id in entities: + if user.permissions.check_entity(entity_id, POLICY_CONTROL): + return await service(call) + + raise Unauthorized( + context=call.context, + permission=POLICY_CONTROL, + ) + return check_permissions + return decorator + + def lookup_all(perm_lookup: PermissionLookup, lookup_dict: SubCategoryDict, object_id: str) -> ValueType: """Look up permission for all.""" From bfc9deb9febfec13c6129c3cd9ed0669179fd704 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Tue, 2 Apr 2019 15:56:03 -0600 Subject: [PATCH 2/3] Ensure OpenUV service has proper user permissions --- homeassistant/components/openuv/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/homeassistant/components/openuv/__init__.py b/homeassistant/components/openuv/__init__.py index 8e8401bbeac36c..63d2744cd4df0d 100644 --- a/homeassistant/components/openuv/__init__.py +++ b/homeassistant/components/openuv/__init__.py @@ -11,6 +11,7 @@ from homeassistant.helpers import aiohttp_client, config_validation as cv from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.entity import Entity +from homeassistant.helpers.service import verify_domain_control from .config_flow import configured_instances from .const import DOMAIN @@ -130,6 +131,8 @@ async def async_setup_entry(hass, config_entry): from pyopenuv import Client from pyopenuv.errors import OpenUvError + _verify_domain_control = verify_domain_control(hass, DOMAIN) + try: websession = aiohttp_client.async_get_clientsession(hass) openuv = OpenUV( @@ -155,6 +158,7 @@ async def async_setup_entry(hass, config_entry): hass.config_entries.async_forward_entry_setup( config_entry, component)) + @_verify_domain_control async def update_data(service): """Refresh OpenUV data.""" _LOGGER.debug('Refreshing OpenUV data') From 553e1c6aaf9825ff0b7220f82fa78db4d3363bd9 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Sat, 13 Apr 2019 14:54:20 -0600 Subject: [PATCH 3/3] Reverting strange changes --- homeassistant/auth/permissions/util.py | 53 +------------------------- 1 file changed, 1 insertion(+), 52 deletions(-) diff --git a/homeassistant/auth/permissions/util.py b/homeassistant/auth/permissions/util.py index cbb38c25ba5ec0..d2d259fb32ee7d 100644 --- a/homeassistant/auth/permissions/util.py +++ b/homeassistant/auth/permissions/util.py @@ -1,67 +1,16 @@ """Helpers to deal with permissions.""" from functools import wraps -from typing import ( - TYPE_CHECKING, Callable, Dict, List, Optional, Union, cast) # noqa: F401 +from typing import Callable, Dict, List, Optional, Union, cast # noqa: F401 -from homeassistant.exceptions import Unauthorized, UnknownUser - -from .const import POLICY_CONTROL from .models import PermissionLookup from .types import CategoryType, SubCategoryDict, ValueType -if TYPE_CHECKING: - from homeassistant.core import HomeAssistant, Service, ServiceCall # noqa - LookupFunc = Callable[[PermissionLookup, SubCategoryDict, str], Optional[ValueType]] SubCatLookupType = Dict[str, LookupFunc] -def authorized_service_call(hass: 'HomeAssistant', domain: str) -> Callable: - """Ensure user of a config entry-enabled service call has permission.""" - def decorator(service: 'Service') -> Callable: - """Decorate.""" - @wraps(service) - async def check_permissions(call: 'ServiceCall') -> None: - """Check user permission and raise before call if unauthorized.""" - if not call.context.user_id: - return - - user = await hass.auth.async_get_user(call.context.user_id) - if user is None: - raise UnknownUser( - context=call.context, - permission=POLICY_CONTROL - ) - - # If the user passes one or more entity IDs, check permissions - # there; otherwise, check permissions against entities registered - # to the domain: - if call.data.get('entity_id'): - if isinstance(call.data['entity'], str): - entities = [call.data['entity_id']] - else: - entities = call.data['entity_id'] - else: - reg = await hass.helpers.entity_registry.async_get_registry() - entities = [ - entity.entity_id for entity in reg.entities.values() - if entity.platform == domain - ] - - for entity_id in entities: - if user.permissions.check_entity(entity_id, POLICY_CONTROL): - return await service(call) - - raise Unauthorized( - context=call.context, - permission=POLICY_CONTROL, - ) - return check_permissions - return decorator - - def lookup_all(perm_lookup: PermissionLookup, lookup_dict: SubCategoryDict, object_id: str) -> ValueType: """Look up permission for all."""