Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions src/azure-cli-core/azure/cli/core/adal_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,7 @@ def _get_token(self, sdk_resource=None):
def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
logger.debug("AdalAuthentication.get_token invoked by Track 2 SDK with scopes=%s", scopes)

# Deal with an old Track 2 SDK issue where the default credential_scopes is extended with
# custom credential_scopes. Instead, credential_scopes should be replaced by custom credential_scopes.
# https://github.com/Azure/azure-sdk-for-python/issues/12947
# We simply remove the first one if there are multiple scopes provided.
if len(scopes) > 1:
scopes = scopes[1:]

_, token, full_token, _ = self._get_token(scopes_to_resource(scopes))
_, token, full_token, _ = self._get_token(_try_scopes_to_resource(scopes))
try:
return AccessToken(token, int(full_token['expiresIn'] + time.time()))
except KeyError: # needed to deal with differing unserialized MSI token payload
Expand Down Expand Up @@ -106,7 +99,10 @@ class MSIAuthenticationWrapper(MSIAuthentication):
# This method is exposed for Azure Core. Add *scopes, **kwargs to fit azure.core requirement
def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
logger.debug("MSIAuthenticationWrapper.get_token invoked by Track 2 SDK with scopes=%s", scopes)
self.resource = scopes_to_resource(scopes)
resource = _try_scopes_to_resource(scopes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the resource is None, which resource is used to get token then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resource used to create MSIAuthenticationWrapper will be used.

return MSIAuthenticationWrapper(resource=resource)

if resource:
# If available, use resource provided by SDK
self.resource = resource
self.set_token()
return AccessToken(self.token['access_token'], int(self.token['expires_on']))

Expand Down Expand Up @@ -135,3 +131,25 @@ def set_token(self):
def signed_session(self, session=None):
logger.debug("MSIAuthenticationWrapper.signed_session invoked by Track 1 SDK")
super().signed_session(session)


def _try_scopes_to_resource(scopes):
"""Wrap scopes_to_resource to workaround some SDK issues."""

# Track 2 SDKs generated before https://github.com/Azure/autorest.python/pull/239 don't maintain
# credential_scopes and call `get_token` with empty scopes.
# As a workaround, return None so that the CLI-managed resource is used.
if not scopes:
logger.debug("No scope is provided by the SDK, use the CLI-managed resource.")
return None

# Track 2 SDKs generated before https://github.com/Azure/autorest.python/pull/745 extend default
# credential_scopes with custom credential_scopes. Instead, credential_scopes should be replaced by
# custom credential_scopes. https://github.com/Azure/azure-sdk-for-python/issues/12947
# As a workaround, remove the first one if there are multiple scopes provided.
if len(scopes) > 1:
logger.debug("Multiple scopes are provided by the SDK, discarding the first one: %s", scopes[0])
return scopes_to_resource(scopes[1:])

# Exactly only one scope is provided
return scopes_to_resource(scopes)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

# pylint: disable=line-too-long
import unittest
from azure.cli.core.adal_authentication import _try_scopes_to_resource


class TestUtils(unittest.TestCase):

def test_try_scopes_to_resource(self):
# Test no scopes
self.assertIsNone(_try_scopes_to_resource(()))
self.assertIsNone(_try_scopes_to_resource([]))
self.assertIsNone(_try_scopes_to_resource(None))

# Test multiple scopes, with the first one discarded
resource = _try_scopes_to_resource(("https://management.core.windows.net//.default",
"https://management.core.chinacloudapi.cn//.default"))
self.assertEqual(resource, "https://management.core.chinacloudapi.cn/")

# Test single scopes (the correct usage)
resource = _try_scopes_to_resource(("https://management.core.chinacloudapi.cn//.default",))
self.assertEqual(resource, "https://management.core.chinacloudapi.cn/")


if __name__ == '__main__':
unittest.main()