diff --git a/src/azure-cli-core/azure/cli/core/adal_authentication.py b/src/azure-cli-core/azure/cli/core/adal_authentication.py index a4496b05d7a..4d42159d5f9 100644 --- a/src/azure-cli-core/azure/cli/core/adal_authentication.py +++ b/src/azure-cli-core/azure/cli/core/adal_authentication.py @@ -68,14 +68,7 @@ def _get_token(self, sdk_resource=None): def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument logger.debug("AdalAuthentication.get_token invoked by Track 2 SDK with scopes=%s", scopes) - # Deal with an old Track 2 SDK issue where the default credential_scopes is extended with - # custom credential_scopes. Instead, credential_scopes should be replaced by custom credential_scopes. - # https://github.com/Azure/azure-sdk-for-python/issues/12947 - # We simply remove the first one if there are multiple scopes provided. - if len(scopes) > 1: - scopes = scopes[1:] - - _, token, full_token, _ = self._get_token(scopes_to_resource(scopes)) + _, token, full_token, _ = self._get_token(_try_scopes_to_resource(scopes)) try: return AccessToken(token, int(full_token['expiresIn'] + time.time())) except KeyError: # needed to deal with differing unserialized MSI token payload @@ -106,7 +99,10 @@ class MSIAuthenticationWrapper(MSIAuthentication): # This method is exposed for Azure Core. Add *scopes, **kwargs to fit azure.core requirement def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument logger.debug("MSIAuthenticationWrapper.get_token invoked by Track 2 SDK with scopes=%s", scopes) - self.resource = scopes_to_resource(scopes) + resource = _try_scopes_to_resource(scopes) + if resource: + # If available, use resource provided by SDK + self.resource = resource self.set_token() return AccessToken(self.token['access_token'], int(self.token['expires_on'])) @@ -135,3 +131,25 @@ def set_token(self): def signed_session(self, session=None): logger.debug("MSIAuthenticationWrapper.signed_session invoked by Track 1 SDK") super().signed_session(session) + + +def _try_scopes_to_resource(scopes): + """Wrap scopes_to_resource to workaround some SDK issues.""" + + # Track 2 SDKs generated before https://github.com/Azure/autorest.python/pull/239 don't maintain + # credential_scopes and call `get_token` with empty scopes. + # As a workaround, return None so that the CLI-managed resource is used. + if not scopes: + logger.debug("No scope is provided by the SDK, use the CLI-managed resource.") + return None + + # Track 2 SDKs generated before https://github.com/Azure/autorest.python/pull/745 extend default + # credential_scopes with custom credential_scopes. Instead, credential_scopes should be replaced by + # custom credential_scopes. https://github.com/Azure/azure-sdk-for-python/issues/12947 + # As a workaround, remove the first one if there are multiple scopes provided. + if len(scopes) > 1: + logger.debug("Multiple scopes are provided by the SDK, discarding the first one: %s", scopes[0]) + return scopes_to_resource(scopes[1:]) + + # Exactly only one scope is provided + return scopes_to_resource(scopes) diff --git a/src/azure-cli-core/azure/cli/core/tests/test_adal_authentication.py b/src/azure-cli-core/azure/cli/core/tests/test_adal_authentication.py new file mode 100644 index 00000000000..be660f2e751 --- /dev/null +++ b/src/azure-cli-core/azure/cli/core/tests/test_adal_authentication.py @@ -0,0 +1,30 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +# pylint: disable=line-too-long +import unittest +from azure.cli.core.adal_authentication import _try_scopes_to_resource + + +class TestUtils(unittest.TestCase): + + def test_try_scopes_to_resource(self): + # Test no scopes + self.assertIsNone(_try_scopes_to_resource(())) + self.assertIsNone(_try_scopes_to_resource([])) + self.assertIsNone(_try_scopes_to_resource(None)) + + # Test multiple scopes, with the first one discarded + resource = _try_scopes_to_resource(("https://management.core.windows.net//.default", + "https://management.core.chinacloudapi.cn//.default")) + self.assertEqual(resource, "https://management.core.chinacloudapi.cn/") + + # Test single scopes (the correct usage) + resource = _try_scopes_to_resource(("https://management.core.chinacloudapi.cn//.default",)) + self.assertEqual(resource, "https://management.core.chinacloudapi.cn/") + + +if __name__ == '__main__': + unittest.main()