Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ def __init__(self, credential, auxiliary_credentials=None):
self._auxiliary_credentials = auxiliary_credentials

def get_token(self, *scopes, **kwargs):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_token is kept as ssh extension still direclty uses it to get an SSH certificate:

https://github.com/Azure/azure-cli-extensions/blob/695bd02037a7a8abd6b0ac76ae1ac1559ae46c41/src/ssh/azext_ssh/custom.py#L232

        credential, _, _ = profile.get_login_credentials(subscription_id=profile.get_subscription()["id"])
        certificatedata = credential.get_token(*scopes, data=data)
        certificate = certificatedata.token

"""Get an access token from the main credential."""
"""Implement the old SDK token protocol azure.core.credentials.TokenCredential
Return azure.core.credentials.AccessToken
"""
logger.debug("CredentialAdaptor.get_token: scopes=%r, kwargs=%r", scopes, kwargs)

# Discard unsupported kwargs: tenant_id, enable_cae
filtered_kwargs = {}
if 'data' in kwargs:
filtered_kwargs['data'] = kwargs['data']
msal_kwargs = _prepare_msal_kwargs(kwargs)
msal_result = self._credential.acquire_token(list(scopes), **msal_kwargs)
return build_sdk_access_token(msal_result)

def get_token_info(self, *scopes, options=None):
"""Implement the new SDK token protocol azure.core.credentials.SupportsTokenInfo
Return azure.core.credentials.AccessTokenInfo
"""
logger.debug("CredentialAdaptor.get_token_info: scopes=%r, options=%r", scopes, options)

return build_sdk_access_token(self._credential.acquire_token(list(scopes), **filtered_kwargs))
msal_kwargs = _prepare_msal_kwargs(options)
msal_result = self._credential.acquire_token(list(scopes), **msal_kwargs)
return _build_sdk_access_token_info(msal_result)

def get_auxiliary_tokens(self, *scopes, **kwargs):
"""Get access tokens from auxiliary credentials."""
Expand All @@ -41,3 +50,33 @@ def get_auxiliary_tokens(self, *scopes, **kwargs):
return [build_sdk_access_token(cred.acquire_token(list(scopes), **kwargs))
for cred in self._auxiliary_credentials]
return None


def _prepare_msal_kwargs(options=None):
# Preserve supported options and discard unsupported options (tenant_id, enable_cae).
# Both get_token's kwargs and get_token_info's options are accepted as their schema is the same (at least for now).
msal_kwargs = {}
if options:
# For VM SSH. 'data' support is a CLI-specific extension.
# SDK doesn't support 'data': https://github.com/Azure/azure-sdk-for-python/pull/16397
if 'data' in options:
msal_kwargs['data'] = options['data']
Comment on lines +60 to +63
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data is currently only officially used by ssh extension:

https://github.com/Azure/azure-cli-extensions/blob/695bd02037a7a8abd6b0ac76ae1ac1559ae46c41/src/ssh/azext_ssh/custom.py#L232

        credential, _, _ = profile.get_login_credentials(subscription_id=profile.get_subscription()["id"])
        certificatedata = credential.get_token(*scopes, data=data)

# For CAE
if 'claims' in options:
msal_kwargs['claims_challenge'] = options['claims']
return msal_kwargs


def _build_sdk_access_token_info(token_entry):
Copy link
Member Author

@jiasli jiasli Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_sdk_access_token is still used by azure.cli.core.auth.adal_authentication.MSIAuthenticationWrapper.get_token:

return build_sdk_access_token(result)

so it can't be moved to src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py right now.

# MSAL token entry sample:
# {
# 'access_token': 'eyJ0eXAiOiJKV...',
# 'token_type': 'Bearer',
# 'expires_in': 1618,
# 'token_source': 'cache'
# }
from .constants import ACCESS_TOKEN, EXPIRES_IN
from .util import _now_timestamp
from azure.core.credentials import AccessTokenInfo

return AccessTokenInfo(token_entry[ACCESS_TOKEN], _now_timestamp() + token_entry[EXPIRES_IN])
15 changes: 8 additions & 7 deletions src/azure-cli-core/azure/cli/core/auth/msal_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,23 @@ def __init__(self, client_id, username, **kwargs):

self._account = accounts[0]

def acquire_token(self, scopes, claims=None, **kwargs):
def acquire_token(self, scopes, claims_challenge=None, **kwargs):
# scopes must be a list.
# For acquiring SSH certificate, scopes is ['https://pas.windows.net/CheckMyAccess/Linux/.default']
# kwargs is already sanitized by CredentialAdaptor, so it can be safely passed to MSAL
logger.debug("UserCredential.acquire_token: scopes=%r, claims=%r, kwargs=%r", scopes, claims, kwargs)
logger.debug("UserCredential.acquire_token: scopes=%r, claims_challenge=%r, kwargs=%r",
scopes, claims_challenge, kwargs)

if claims:
if claims_challenge:
logger.warning('Acquiring new access token silently for tenant %s with claims challenge: %s',
self._msal_app.authority.tenant, claims)
result = self._msal_app.acquire_token_silent_with_error(scopes, self._account, claims_challenge=claims,
**kwargs)
self._msal_app.authority.tenant, claims_challenge)
result = self._msal_app.acquire_token_silent_with_error(
scopes, self._account, claims_challenge=claims_challenge, **kwargs)

from azure.cli.core.azclierror import AuthenticationError
try:
# Check if an access token is returned.
check_result(result, scopes=scopes, claims=claims)
check_result(result, scopes=scopes, claims_challenge=claims_challenge)
except AuthenticationError as ex:
# For VM SSH ('data' is passed), if getting access token fails because
# Conditional Access MFA step-up or compliance check is required, re-launch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------


import unittest
from unittest import mock

from ..credential_adaptor import CredentialAdaptor


MOCK_ACCESS_TOKEN = "mock_access_token"
MOCK_DATA = {
'key_id': 'test',
'req_cnf': 'test',
'token_type': 'ssh-cert'
}
MOCK_CLAIMS = {"test_claims": "value2"}

class MsalCredentialStub:

def __init__(self, *args, **kwargs):
self.acquire_token_scopes = None
self.acquire_token_claims_challenge = None
self.acquire_token_kwargs = None
super().__init__()

def acquire_token(self, scopes, claims_challenge=None, **kwargs):
self.acquire_token_scopes = scopes
self.acquire_token_claims_challenge = claims_challenge
self.acquire_token_kwargs = kwargs
return {
'access_token': MOCK_ACCESS_TOKEN,
'token_type': 'Bearer',
'expires_in': 1800,
'token_source': 'cache'
}

def _now_timestamp_mock():
# 2021-09-06 08:55:23
return 1630918523


class TestCredentialAdaptor(unittest.TestCase):

@mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock)
def test_get_token(self):
msal_cred = MsalCredentialStub()
sdk_cred = CredentialAdaptor(msal_cred)
access_token = sdk_cred.get_token('https://management.core.windows.net//.default')
assert msal_cred.acquire_token_scopes == ['https://management.core.windows.net//.default']

from ..util import AccessToken
assert isinstance(access_token, AccessToken)
assert access_token.token == MOCK_ACCESS_TOKEN
assert access_token.expires_on == 1630920323

# Note that SDK doesn't support 'data'. This is a CLI-specific extension.
sdk_cred.get_token('https://management.core.windows.net//.default', data=MOCK_DATA)
assert msal_cred.acquire_token_kwargs['data'] == MOCK_DATA

sdk_cred.get_token('https://management.core.windows.net//.default', claims=MOCK_CLAIMS)
assert msal_cred.acquire_token_claims_challenge == MOCK_CLAIMS


@mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock)
def test_get_token_info(self):
msal_cred = MsalCredentialStub()
sdk_cred = CredentialAdaptor(msal_cred)
access_token_info = sdk_cred.get_token_info('https://management.core.windows.net//.default')

from azure.core.credentials import AccessTokenInfo
assert isinstance(access_token_info, AccessTokenInfo)
assert access_token_info.token == MOCK_ACCESS_TOKEN
assert access_token_info.expires_on == 1630920323
assert access_token_info.token_type == 'Bearer'

assert msal_cred.acquire_token_scopes == ['https://management.core.windows.net//.default']

# Note that SDK doesn't support 'data'. If 'data' were supported, it should be tested with:
sdk_cred.get_token_info('https://management.core.windows.net//.default', options={'data': MOCK_DATA})
assert msal_cred.acquire_token_kwargs['data'] == MOCK_DATA

sdk_cred.get_token_info('https://management.core.windows.net//.default', options={'claims': MOCK_CLAIMS})
assert msal_cred.acquire_token_claims_challenge == MOCK_CLAIMS


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions src/azure-cli-core/azure/cli/core/auth/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ def aad_error_handler(error, **kwargs):
raise AuthenticationError(error_description, msal_error=error, recommendation=recommendation)


def _generate_login_command(scopes=None, claims=None):
def _generate_login_command(scopes=None, claims_challenge=None):
login_command = ['az login']

# Rejected by Conditional Access policy, like MFA
if scopes:
login_command.append('--scope {}'.format(' '.join(scopes)))

# Rejected by CAE
if claims:
if claims_challenge:
# Explicit logout is needed: https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/335
return 'az logout\n' + ' '.join(login_command)

Expand Down