Skip to content

Commit

Permalink
- Minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Apr 6, 2022
1 parent d9b41c1 commit ebfdad0
Showing 1 changed file with 15 additions and 38 deletions.
53 changes: 15 additions & 38 deletions geonode/security/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,12 @@ def set_permissions(uuid: str, /, instance=None, owner: settings.AUTH_USER_MODEL
_owner = _resource.owner
_resource_type = _resource.resource_type or _resource.polymorphic_ctype.name

# default permissions for anonymous users
anonymous_group, _ = Group.objects.get_or_create(name='anonymous')

if not anonymous_group:
raise Exception("Could not acquire 'anonymous' Group.")

# Gathering and validating the current permissions (if any has been passed)
if not created and permissions is None:
permissions = _resource.get_all_level_info()
Expand Down Expand Up @@ -845,8 +851,11 @@ def set_permissions(uuid: str, /, instance=None, owner: settings.AUTH_USER_MODEL
# Anonymous User group
if 'users' in _perm_spec and ("AnonymousUser" in _perm_spec['users'] or get_anonymous_user() in _perm_spec['users']):
anonymous_user = "AnonymousUser" if "AnonymousUser" in _perm_spec['users'] else get_anonymous_user()
anonymous_group = Group.objects.get(name='anonymous')
for perm in _perm_spec['users'][anonymous_user]:
perms = copy.deepcopy(_perm_spec['users'][anonymous_user])
_perm_spec['users'].pop(anonymous_user)
_prev_perm = _perm_spec["groups"].get(anonymous_group, []) if "groups" in _perm_spec else []
_perm_spec["groups"][anonymous_group] = set.union(perms_as_set(_prev_perm), perms_as_set(perms))
for perm in _perm_spec["groups"][anonymous_group]:
if _resource_type == 'layer' and perm in (
'change_layer_data', 'change_layer_style',
'add_layer', 'change_layer', 'delete_layer'):
Expand Down Expand Up @@ -902,60 +911,28 @@ def set_permissions(uuid: str, /, instance=None, owner: settings.AUTH_USER_MODEL
except Permission.DoesNotExist as e:
logger.exception(e)
logger.exception(f"Permissions {perm} does not exists for resource {_resource.get_self_resource()}")

# AnonymousUser
if 'users' in _perm_spec and len(_perm_spec['users']) > 0:
if "AnonymousUser" in _perm_spec['users'] or get_anonymous_user() in _perm_spec['users']:
_user = get_anonymous_user()
anonymous_user = "AnonymousUser" if "AnonymousUser" in _perm_spec['users'] else get_anonymous_user()
perms = _perm_spec['users'][anonymous_user]
for perm in perms:
if _resource_type == 'layer' and perm in (
'change_layer_data', 'change_layer_style',
'add_layer', 'change_layer', 'delete_layer'):
try:
assign_perm(perm, _user, _resource.layer)
except Permission.DoesNotExist as e:
logger.exception(e)
logger.exception(f"Permissions {perm} does not exists for resource {_resource.layer}")
elif AdvancedSecurityWorkflowManager.assignable_perm_condition(perm, _resource_type):
try:
assign_perm(perm, _user, _resource.get_self_resource())
except Permission.DoesNotExist as e:
logger.exception(e)
logger.exception(f"Permissions {perm} does not exists for resource {_resource.get_self_resource()}")
else:
# default permissions for anonymous users
anonymous_group, created = Group.objects.get_or_create(name='anonymous')

if not anonymous_group:
raise Exception("Could not acquire 'anonymous' Group.")

# Anonymous
if AdvancedSecurityWorkflowManager.is_anonymous_can_view():
assign_perm('view_resourcebase',
anonymous_group, _resource.get_self_resource())
assign_perm('view_resourcebase', anonymous_group, _resource.get_self_resource())
_prev_perm = _perm_spec["groups"].get(anonymous_group, []) if "groups" in _perm_spec else []
_perm_spec["groups"][anonymous_group] = set.union(perms_as_set(_prev_perm), perms_as_set('view_resourcebase'))
else:
for user_group in get_user_groups(_owner):
if not skip_registered_members_common_group(user_group):
assign_perm('view_resourcebase',
user_group, _resource.get_self_resource())
assign_perm('view_resourcebase', user_group, _resource.get_self_resource())
_prev_perm = _perm_spec["groups"].get(user_group, []) if "groups" in _perm_spec else []
_perm_spec["groups"][user_group] = set.union(perms_as_set(_prev_perm), perms_as_set('view_resourcebase'))

if AdvancedSecurityWorkflowManager.assignable_perm_condition('download_resourcebase', _resource_type):
if AdvancedSecurityWorkflowManager.is_anonymous_can_download():
assign_perm('download_resourcebase',
anonymous_group, _resource.get_self_resource())
assign_perm('download_resourcebase', anonymous_group, _resource.get_self_resource())
_prev_perm = _perm_spec["groups"].get(anonymous_group, []) if "groups" in _perm_spec else []
_perm_spec["groups"][anonymous_group] = set.union(perms_as_set(_prev_perm), perms_as_set('download_resourcebase'))
else:
for user_group in get_user_groups(_owner):
if not skip_registered_members_common_group(user_group):
assign_perm('download_resourcebase',
user_group, _resource.get_self_resource())
assign_perm('download_resourcebase', user_group, _resource.get_self_resource())
_prev_perm = _perm_spec["groups"].get(user_group, []) if "groups" in _perm_spec else []
_perm_spec["groups"][user_group] = set.union(perms_as_set(_prev_perm), perms_as_set('download_resourcebase'))

Expand Down

0 comments on commit ebfdad0

Please sign in to comment.