Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/command_modules/azure-cli-role/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

Release History
===============
2.0.17
++++++
* role assignment: expose --assignee-object-id to bypass graph query

2.0.16
++++++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def load_arguments(self, _):
c.argument('show_all', options_list=['--all'], action='store_true', help='show all assignments under the current subscription')
c.argument('include_inherited', action='store_true', help='include assignments applied on parent scopes')
c.argument('assignee', help='represent a user, group, or service principal. supported format: object id, user sign-in name, or service principal name')
c.argument('assignee_object_id', help="assignee's graph object id, such as the 'principal id' from a managed service identity. Use this instead of '--assignee' to bypass graph permission issues")
c.argument('ids', nargs='+', help='space separated role assignment ids')
c.argument('include_classic_administrators', arg_type=get_three_state_flag(), help='list default role assignments for subscription classic administrators, aka co-admins')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from knack.log import get_logger
from knack.util import CLIError, todict

from msrestazure.azure_exceptions import CloudError
from azure.graphrbac.models.graph_error import GraphErrorException

from azure.cli.core.util import get_file_json, shell_safe_json_parse

from azure.mgmt.authorization.models import (RoleAssignmentProperties, Permission, RoleDefinition,
Expand All @@ -34,6 +37,8 @@

_CUSTOM_RULE = 'CustomRole'

# pylint: disable=too-many-lines
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is the indication that this file should be split into smaller ones.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experience working with modules that did split their custom commands into multiple files, it simply added complexity.



def list_role_definitions(cmd, name=None, resource_group_name=None, scope=None,
custom_role_only=False):
Expand Down Expand Up @@ -79,7 +84,7 @@ def _create_update_role_definition(cli_ctx, role_definition, for_update):
role_name = matched[0].properties.role_name
role_id = matched[0].name
else:
role_id = uuid.uuid4()
role_id = _gen_guid()

if not for_update and 'assignableScopes' not in role_definition:
raise CLIError("please provide 'assignableScopes'")
Expand Down Expand Up @@ -118,8 +123,11 @@ def _search_role_definitions(definitions_client, name, scope, custom_role_only=F
return roles


def create_role_assignment(cmd, role, assignee, resource_group_name=None, scope=None):
return _create_role_assignment(cmd.cli_ctx, role, assignee, resource_group_name, scope)
def create_role_assignment(cmd, role, assignee=None, assignee_object_id=None, resource_group_name=None, scope=None):
if bool(assignee) == bool(assignee_object_id):
raise CLIError('usage error: --assignee STRING | --assignee-object-id GUID')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we couldn't simply accept a string or GUID for assignee? This seems like an Xplat anti-pattern.

Copy link
Contributor Author

@yugangw-msft yugangw-msft Jan 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is GUID can be used as assignee names, so w/o graph query, we would not know. For people login with service principal, they need to have the option to bypass the graph query.
I have proposed that on graph permission error cli can fallback to object id as long as it is a guid, but it is possible we could assign by mistake when service principal 2's object id happens to be the service principal 1's name.

return _create_role_assignment(cmd.cli_ctx, role, assignee or assignee_object_id,
resource_group_name, scope, resolve_assignee=(not assignee_object_id))


def _create_role_assignment(cli_ctx, role, assignee, resource_group_name=None, scope=None,
Expand All @@ -134,7 +142,7 @@ def _create_role_assignment(cli_ctx, role, assignee, resource_group_name=None, s
role_id = _resolve_role_id(role, scope, definitions_client)
object_id = _resolve_object_id(cli_ctx, assignee) if resolve_assignee else assignee
properties = RoleAssignmentProperties(role_id, object_id)
assignment_name = uuid.uuid4()
assignment_name = _gen_guid()
custom_headers = None
return assignments_client.create(scope, assignment_name, properties,
custom_headers=custom_headers)
Expand Down Expand Up @@ -184,13 +192,17 @@ def list_role_assignments(cmd, assignee=None, role=None, resource_group_name=Non
# fill in principal names
principal_ids = set(i['properties']['principalId'] for i in results if i['properties']['principalId'])
if principal_ids:
principals = _get_object_stubs(graph_client, principal_ids)
principal_dics = {i.object_id: _get_displayable_name(i) for i in principals}
try:
principals = _get_object_stubs(graph_client, principal_ids)
principal_dics = {i.object_id: _get_displayable_name(i) for i in principals}

for i in [r for r in results if not r['properties'].get('principalName')]:
i['properties']['principalName'] = ''
if principal_dics.get(i['properties']['principalId']):
i['properties']['principalName'] = principal_dics[i['properties']['principalId']]
for i in [r for r in results if not r['properties'].get('principalName')]:
i['properties']['principalName'] = ''
if principal_dics.get(i['properties']['principalId']):
i['properties']['principalName'] = principal_dics[i['properties']['principalId']]
except (CloudError, GraphErrorException) as ex:
# failure on resolving principal due to graph permission should not fail the whole thing
logger.info("Failed to resolve graph object information per error '%s'", ex)

return results

Expand All @@ -200,14 +212,12 @@ def _backfill_assignments_for_co_admins(cli_ctx, auth_client, assignee=None):
co_admins = [x for x in co_admins if x.properties.email_address]
graph_client = _graph_client_factory(cli_ctx)
if assignee: # apply assignee filter if applicable
try:
uuid.UUID(assignee)
if _is_guid(assignee):
result = _get_object_stubs(graph_client, [assignee])
if not result:
return []
assignee = _get_displayable_name(result[0]).lower()
except ValueError:
pass

co_admins = [x for x in co_admins if assignee == x.properties.email_address.lower()]

if not co_admins:
Expand Down Expand Up @@ -274,7 +284,7 @@ def _search_role_assignments(cli_ctx, assignments_client, definitions_client,
scope, assignee, role, include_inherited, include_groups):
assignee_object_id = None
if assignee:
assignee_object_id = _resolve_object_id(cli_ctx, assignee)
assignee_object_id = _resolve_object_id(cli_ctx, assignee, fallback_to_object_id=True)

# combining filters is unsupported, so we pick the best, and do limited maunal filtering
if assignee_object_id:
Expand Down Expand Up @@ -321,12 +331,9 @@ def _resolve_role_id(role, scope, definitions_client):
role, re.I):
role_id = role
else:
try:
uuid.UUID(role)
if _is_guid(role):
role_id = '/subscriptions/{}/providers/Microsoft.Authorization/roleDefinitions/{}'.format(
definitions_client.config.subscription_id, role)
except ValueError:
pass
if not role_id: # retrieve role id
role_defs = list(definitions_client.list(scope, "roleName eq '{}'".format(role)))
if not role_defs:
Expand Down Expand Up @@ -407,7 +414,6 @@ def create_application(client, display_name, homepage, identifier_uris,
available_to_other_tenants=False, password=None, reply_urls=None,
key_value=None, key_type=None, key_usage=None, start_date=None,
end_date=None):
from azure.graphrbac.models import GraphErrorException
password_creds, key_creds = _build_application_creds(password, key_value, key_type,
key_usage, start_date, end_date)

Expand Down Expand Up @@ -458,11 +464,10 @@ def delete_application(client, identifier):
def _resolve_application(client, identifier):
result = list(client.list(filter="identifierUris/any(s:s eq '{}')".format(identifier)))
if not result:
try:
uuid.UUID(identifier)
if _is_guid(identifier):
# it is either app id or object id, let us verify
result = list(client.list(filter="appId eq '{}'".format(identifier)))
except ValueError:
else:
raise CLIError("Application '{}' doesn't exist".format(identifier))

return result[0].object_id if result else identifier
Expand All @@ -489,9 +494,9 @@ def _build_application_creds(password=None, key_value=None, key_type=None,
password_creds = None
key_creds = None
if password:
password_creds = [PasswordCredential(start_date, end_date, str(uuid.uuid4()), password)]
password_creds = [PasswordCredential(start_date, end_date, str(_gen_guid()), password)]
elif key_value:
key_creds = [KeyCredential(start_date, end_date, key_value, str(uuid.uuid4()),
key_creds = [KeyCredential(start_date, end_date, key_value, str(_gen_guid()),
key_usage, key_type)]

return (password_creds, key_creds)
Expand All @@ -505,10 +510,9 @@ def _create_service_principal(cli_ctx, identifier, resolve_app=True):
client = _graph_client_factory(cli_ctx)

if resolve_app:
try:
uuid.UUID(identifier)
if _is_guid(identifier):
result = list(client.applications.list(filter="appId eq '{}'".format(identifier)))
except ValueError:
else:
result = list(client.applications.list(
filter="identifierUris/any(s:s eq '{}')".format(identifier)))

Expand Down Expand Up @@ -554,10 +558,9 @@ def _resolve_service_principal(client, identifier):
result = list(client.list(filter="servicePrincipalNames/any(c:c eq '{}')".format(identifier)))
if result:
return result[0].object_id
try:
uuid.UUID(identifier)
if _is_guid(identifier):
return identifier # assume an object id
except ValueError:
else:
raise CLIError("service principal '{}' doesn't exist".format(identifier))


Expand All @@ -566,7 +569,7 @@ def _process_service_principal_creds(cli_ctx, years, app_start_date, app_end_dat

if not any((cert, create_cert, password, keyvault)):
# 1 - Simplest scenario. Use random password
return str(uuid.uuid4()), None, None, None, None
return str(_gen_guid()), None, None, None, None

if password:
# 2 - Password supplied -- no certs
Expand Down Expand Up @@ -934,7 +937,7 @@ def reset_service_principal_credential(cmd, name, password=None, create_cert=Fal
app_creds.append(PasswordCredential(
start_date=app_start_date,
end_date=app_end_date,
key_id=str(uuid.uuid4()),
key_id=str(_gen_guid()),
value=password
))

Expand All @@ -946,7 +949,7 @@ def reset_service_principal_credential(cmd, name, password=None, create_cert=Fal
start_date=app_start_date,
end_date=app_end_date,
value=public_cert_string,
key_id=str(uuid.uuid4()),
key_id=str(_gen_guid()),
usage='Verify',
type='AsymmetricX509Cert'
))
Expand All @@ -966,22 +969,41 @@ def reset_service_principal_credential(cmd, name, password=None, create_cert=Fal
return result


def _resolve_object_id(cli_ctx, assignee):
def _resolve_object_id(cli_ctx, assignee, fallback_to_object_id=False):
client = _graph_client_factory(cli_ctx)
result = None
if assignee.find('@') >= 0: # looks like a user principal name
result = list(client.users.list(filter="userPrincipalName eq '{}'".format(assignee)))
if not result:
result = list(client.service_principals.list(
filter="servicePrincipalNames/any(c:c eq '{}')".format(assignee)))
if not result: # assume an object id, let us verify it
result = _get_object_stubs(client, [assignee])
try:
if assignee.find('@') >= 0: # looks like a user principal name
result = list(client.users.list(filter="userPrincipalName eq '{}'".format(assignee)))
if not result:
result = list(client.service_principals.list(
filter="servicePrincipalNames/any(c:c eq '{}')".format(assignee)))
if not result: # assume an object id, let us verify it
result = _get_object_stubs(client, [assignee])

# 2+ matches should never happen, so we only check 'no match' here
if not result:
raise CLIError("No matches in graph database for '{}'".format(assignee))

return result[0].object_id
except (CloudError, GraphErrorException):
if fallback_to_object_id and _is_guid(assignee):
return assignee
raise


def _is_guid(guid):
try:
uuid.UUID(guid)
return True
except ValueError:
pass
return False

# 2+ matches should never happen, so we only check 'no match' here
if not result:
raise CLIError("No matches in graph database for '{}'".format(assignee))

return result[0].object_id
# for injecting test seams to produce predicatable role assignment id for playback
def _gen_guid():
return uuid.uuid4()


def _get_object_stubs(graph_client, assignees):
Expand Down

Large diffs are not rendered by default.

Loading