diff --git a/sdk/identity/azure-identity/azure/identity/_constants.py b/sdk/identity/azure-identity/azure/identity/_constants.py index 373f4b2844b4..ee98638987ab 100644 --- a/sdk/identity/azure-identity/azure/identity/_constants.py +++ b/sdk/identity/azure-identity/azure/identity/_constants.py @@ -5,7 +5,8 @@ AZURE_CLI_CLIENT_ID = "04b07795-8ddb-461a-bbee-02f9e1bf7b46" - +AZURE_VSCODE_CLIENT_ID = "aebc6443-996d-45c2-90f0-388ff96faa56" +VSCODE_CREDENTIALS_SECTION = "VS Code Azure" class KnownAuthorities: AZURE_CHINA = "login.chinacloudapi.cn" diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/default.py b/sdk/identity/azure-identity/azure/identity/_credentials/default.py index 7432867a3ac6..74a05de08904 100644 --- a/sdk/identity/azure-identity/azure/identity/_credentials/default.py +++ b/sdk/identity/azure-identity/azure/identity/_credentials/default.py @@ -13,6 +13,7 @@ from .managed_identity import ManagedIdentityCredential from .shared_cache import SharedTokenCacheCredential from .azure_cli import AzureCliCredential +from .vscode_credential import VSCodeCredential try: @@ -39,7 +40,8 @@ class DefaultAzureCredential(ChainedTokenCredential): 3. On Windows only: a user who has signed in with a Microsoft application, such as Visual Studio. If multiple identities are in the cache, then the value of the environment variable ``AZURE_USERNAME`` is used to select which identity to use. See :class:`~azure.identity.SharedTokenCacheCredential` for more details. - 4. The identity currently logged in to the Azure CLI. + 4. The user currently signed in to Visual Studio Code. + 5. The identity currently logged in to the Azure CLI. This default behavior is configurable with keyword arguments. @@ -51,6 +53,8 @@ class DefaultAzureCredential(ChainedTokenCredential): variables from the credential. Defaults to **False**. :keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential. Defaults to **False**. + :keyword bool exclude_visual_studio_code_credential: Whether to exclude stored credential from VS Code. + Defaults to **False**. :keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to **False**. :keyword bool exclude_interactive_browser_credential: Whether to exclude interactive browser authentication (see @@ -73,6 +77,7 @@ def __init__(self, **kwargs): exclude_environment_credential = kwargs.pop("exclude_environment_credential", False) exclude_managed_identity_credential = kwargs.pop("exclude_managed_identity_credential", False) exclude_shared_token_cache_credential = kwargs.pop("exclude_shared_token_cache_credential", False) + exclude_visual_studio_code_credential = kwargs.pop("exclude_visual_studio_code_credential", False) exclude_cli_credential = kwargs.pop("exclude_cli_credential", False) exclude_interactive_browser_credential = kwargs.pop("exclude_interactive_browser_credential", True) @@ -91,6 +96,8 @@ def __init__(self, **kwargs): except Exception as ex: # pylint:disable=broad-except # transitive dependency pywin32 doesn't support 3.8 (https://github.com/mhammond/pywin32/issues/1431) _LOGGER.info("Shared token cache is unavailable: '%s'", ex) + if not exclude_visual_studio_code_credential: + credentials.append(VSCodeCredential()) if not exclude_cli_credential: credentials.append(AzureCliCredential()) if not exclude_interactive_browser_credential: diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/linux_vscode_adapter.py b/sdk/identity/azure-identity/azure/identity/_credentials/linux_vscode_adapter.py new file mode 100644 index 000000000000..2333f3c48196 --- /dev/null +++ b/sdk/identity/azure-identity/azure/identity/_credentials/linux_vscode_adapter.py @@ -0,0 +1,73 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import os +import json +import ctypes as ct +from .._constants import VSCODE_CREDENTIALS_SECTION + + +def _c_str(string): + return ct.c_char_p(string.encode('utf-8')) + + +try: + _libsecret = ct.cdll.LoadLibrary('libsecret-1.so.0') + _libsecret.secret_schema_new.argtypes = \ + [ct.c_char_p, ct.c_uint, ct.c_char_p, ct.c_uint, ct.c_char_p, ct.c_uint, ct.c_void_p] + _libsecret.secret_password_lookup_sync.argtypes = \ + [ct.c_void_p, ct.c_void_p, ct.c_void_p, ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_void_p] + _libsecret.secret_password_lookup_sync.restype = ct.c_char_p + _libsecret.secret_schema_unref.argtypes = [ct.c_void_p] +except OSError: + _libsecret = None + + +def _get_user_settings_path(): + app_data_folder = os.environ['HOME'] + return os.path.join(app_data_folder, ".config", "Code", "User", "settings.json") + + +def _get_user_settings(): + path = _get_user_settings_path() + try: + with open(path) as file: + data = json.load(file) + environment_name = data.get("azure.cloud", "Azure") + return environment_name + except IOError: + return "Azure" + + +def _get_refresh_token(service_name, account_name): + if not _libsecret: + return None + + # _libsecret.secret_password_lookup_sync raises segment fault on Python 2.7 + # temporarily disable it on 2.7 + import sys + if sys.version_info[0] < 3: + raise NotImplementedError("Not supported on Python 2.7") + + err = ct.c_int() + schema = _libsecret.secret_schema_new(_c_str("org.freedesktop.Secret.Generic"), 2, + _c_str("service"), 0, _c_str("account"), 0, None) + p_str = _libsecret.secret_password_lookup_sync(schema, None, ct.byref(err), _c_str("service"), _c_str(service_name), + _c_str("account"), _c_str(account_name), None) + _libsecret.secret_schema_unref(schema) + if err.value == 0: + return p_str.decode('utf-8') + + return None + + +def get_credentials(): + try: + environment_name = _get_user_settings() + credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name) + return credentials + except NotImplementedError: # pylint:disable=try-except-raise + raise + except Exception: #pylint: disable=broad-except + return None diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/macos_vscode_adapter.py b/sdk/identity/azure-identity/azure/identity/_credentials/macos_vscode_adapter.py new file mode 100644 index 000000000000..915f46e2aa9f --- /dev/null +++ b/sdk/identity/azure-identity/azure/identity/_credentials/macos_vscode_adapter.py @@ -0,0 +1,41 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import os +import json +from msal_extensions.osx import Keychain, KeychainError +from .._constants import VSCODE_CREDENTIALS_SECTION + + +def _get_user_settings_path(): + app_data_folder = os.environ['USER'] + return os.path.join(app_data_folder, "Library", "Application Support", "Code", "User", "settings.json") + + +def _get_user_settings(): + path = _get_user_settings_path() + try: + with open(path) as file: + data = json.load(file) + environment_name = data.get("azure.cloud", "Azure") + return environment_name + except IOError: + return "Azure" + + +def _get_refresh_token(service_name, account_name): + key_chain = Keychain() + try: + return key_chain.get_generic_password(service_name, account_name) + except KeychainError: + return None + + +def get_credentials(): + try: + environment_name = _get_user_settings() + credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name) + return credentials + except Exception: #pylint: disable=broad-except + return None diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/vscode_credential.py b/sdk/identity/azure-identity/azure/identity/_credentials/vscode_credential.py new file mode 100644 index 000000000000..ca8abf2960ca --- /dev/null +++ b/sdk/identity/azure-identity/azure/identity/_credentials/vscode_credential.py @@ -0,0 +1,53 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import sys +from typing import TYPE_CHECKING +from .._exceptions import CredentialUnavailableError +from .._constants import AZURE_VSCODE_CLIENT_ID +from .._internal.aad_client import AadClient +if sys.platform.startswith('win'): + from .win_vscode_adapter import get_credentials +elif sys.platform.startswith('darwin'): + from .macos_vscode_adapter import get_credentials +else: + from .linux_vscode_adapter import get_credentials + +if TYPE_CHECKING: + # pylint:disable=unused-import,ungrouped-imports + from typing import Any, Iterable, Optional + from azure.core.credentials import AccessToken + + +class VSCodeCredential(object): + """Authenticates by redeeming a refresh token previously saved by VS Code + + """ + def __init__(self, **kwargs): + self._client = kwargs.pop("_client", None) or AadClient("organizations", AZURE_VSCODE_CLIENT_ID, **kwargs) + + def get_token(self, *scopes, **kwargs): + # type: (*str, **Any) -> AccessToken + """Request an access token for `scopes`. + + .. note:: This method is called by Azure SDK clients. It isn't intended for use in application code. + + When this method is called, the credential will try to get the refresh token saved by VS Code. If a refresh + token can be found, it will redeem the refresh token for an access token and return the access token. + + :param str scopes: desired scopes for the access token. This method requires at least one scope. + :rtype: :class:`azure.core.credentials.AccessToken` + :raises ~azure.identity.CredentialUnavailableError: fail to get refresh token. + """ + if not scopes: + raise ValueError("'get_token' requires at least one scope") + + refresh_token = get_credentials() + if not refresh_token: + raise CredentialUnavailableError( + message="No Azure user is logged in to Visual Studio Code." + ) + token = self._client.get_cached_access_token(scopes) \ + or self._client.obtain_token_by_refresh_token(refresh_token, scopes, **kwargs) + return token diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/win_vscode_adapter.py b/sdk/identity/azure-identity/azure/identity/_credentials/win_vscode_adapter.py new file mode 100644 index 000000000000..04726e66cd8b --- /dev/null +++ b/sdk/identity/azure-identity/azure/identity/_credentials/win_vscode_adapter.py @@ -0,0 +1,84 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import os +import json +import ctypes as ct +from .._constants import VSCODE_CREDENTIALS_SECTION +try: + import ctypes.wintypes as wt +except (IOError, ValueError): + pass + + +SUPPORTED_CREDKEYS = set(( + 'Type', 'TargetName', 'Persist', + 'UserName', 'Comment', 'CredentialBlob')) + + +class _CREDENTIAL(ct.Structure): + _fields_ = [ + ("Flags", wt.DWORD), + ("Type", wt.DWORD), + ("TargetName", ct.c_wchar_p), + ("Comment", ct.c_wchar_p), + ("LastWritten", wt.FILETIME), + ("CredentialBlobSize", wt.DWORD), + ("CredentialBlob", wt.LPBYTE), + ("Persist", wt.DWORD), + ("AttributeCount", wt.DWORD), + ("Attributes", ct.c_void_p), + ("TargetAlias", ct.c_wchar_p), + ("UserName", ct.c_wchar_p)] + + +_PCREDENTIAL = ct.POINTER(_CREDENTIAL) + +_advapi = ct.WinDLL('advapi32') +_advapi.CredReadW.argtypes = [wt.LPCWSTR, wt.DWORD, wt.DWORD, ct.POINTER(_PCREDENTIAL)] +_advapi.CredReadW.restype = wt.BOOL +_advapi.CredFree.argtypes = [_PCREDENTIAL] + + +def _read_credential(service_name, account_name): + target = "{}/{}".format(service_name, account_name) + cred_ptr = _PCREDENTIAL() + if _advapi.CredReadW(target, 1, 0, ct.byref(cred_ptr)): + cred_blob = cred_ptr.contents.CredentialBlob + cred_blob_size = cred_ptr.contents.CredentialBlobSize + password_as_list = [int.from_bytes(cred_blob[pos:pos + 1], 'little') + for pos in range(0, cred_blob_size)] + cred = ''.join(map(chr, password_as_list)) + _advapi.CredFree(cred_ptr) + return cred + return None + + +def _get_user_settings_path(): + app_data_folder = os.environ['APPDATA'] + return os.path.join(app_data_folder, "Code", "User", "settings.json") + + +def _get_user_settings(): + path = _get_user_settings_path() + try: + with open(path) as file: + data = json.load(file) + environment_name = data.get("azure.cloud", "Azure") + return environment_name + except IOError: + return "Azure" + + +def _get_refresh_token(service_name, account_name): + return _read_credential(service_name, account_name) + + +def get_credentials(): + try: + environment_name = _get_user_settings() + credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name) + return credentials + except Exception: #pylint: disable=broad-except + return None diff --git a/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py b/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py index f8feca5fb0ea..111511f52a4f 100644 --- a/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py +++ b/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py @@ -13,6 +13,7 @@ from .environment import EnvironmentCredential from .managed_identity import ManagedIdentityCredential from .shared_cache import SharedTokenCacheCredential +from .vscode_credential import VSCodeCredential if TYPE_CHECKING: from typing import Any @@ -32,7 +33,8 @@ class DefaultAzureCredential(ChainedTokenCredential): 3. On Windows only: a user who has signed in with a Microsoft application, such as Visual Studio. If multiple identities are in the cache, then the value of the environment variable ``AZURE_USERNAME`` is used to select which identity to use. See :class:`~azure.identity.aio.SharedTokenCacheCredential` for more details. - 4. The identity currently logged in to the Azure CLI. + 4. The user currently signed in to Visual Studio Code. + 5. The identity currently logged in to the Azure CLI. This default behavior is configurable with keyword arguments. @@ -42,6 +44,8 @@ class DefaultAzureCredential(ChainedTokenCredential): :keyword bool exclude_cli_credential: Whether to exclude the Azure CLI from the credential. Defaults to **False**. :keyword bool exclude_environment_credential: Whether to exclude a service principal configured by environment variables from the credential. Defaults to **False**. + :keyword bool exclude_visual_studio_code_credential: Whether to exclude stored credential from VS Code. + Defaults to **False**. :keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential. Defaults to **False**. :keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to @@ -61,6 +65,7 @@ def __init__(self, **kwargs): "shared_cache_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) ) + exclude_visual_studio_code_credential = kwargs.pop("exclude_visual_studio_code_credential", False) exclude_cli_credential = kwargs.pop("exclude_cli_credential", False) exclude_environment_credential = kwargs.pop("exclude_environment_credential", False) exclude_managed_identity_credential = kwargs.pop("exclude_managed_identity_credential", False) @@ -81,6 +86,8 @@ def __init__(self, **kwargs): except Exception as ex: # pylint:disable=broad-except # transitive dependency pywin32 doesn't support 3.8 (https://github.com/mhammond/pywin32/issues/1431) _LOGGER.info("Shared token cache is unavailable: '%s'", ex) + if not exclude_visual_studio_code_credential: + credentials.append(VSCodeCredential()) if not exclude_cli_credential: credentials.append(AzureCliCredential()) diff --git a/sdk/identity/azure-identity/azure/identity/aio/_credentials/vscode_credential.py b/sdk/identity/azure-identity/azure/identity/aio/_credentials/vscode_credential.py new file mode 100644 index 000000000000..963eb87c76d4 --- /dev/null +++ b/sdk/identity/azure-identity/azure/identity/aio/_credentials/vscode_credential.py @@ -0,0 +1,58 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +from typing import TYPE_CHECKING +from ..._exceptions import CredentialUnavailableError +from .._credentials.base import AsyncCredentialBase +from ..._constants import AZURE_VSCODE_CLIENT_ID +from .._internal.aad_client import AadClient +from ..._credentials.vscode_credential import get_credentials +if TYPE_CHECKING: + # pylint:disable=unused-import,ungrouped-imports + from typing import Any, Iterable, Optional + from azure.core.credentials import AccessToken + +class VSCodeCredential(AsyncCredentialBase): + """Authenticates by redeeming a refresh token previously saved by VS Code + + """ + def __init__(self, **kwargs): + self._client = kwargs.pop("_client", None) or AadClient("organizations", AZURE_VSCODE_CLIENT_ID, **kwargs) + + async def __aenter__(self): + if self._client: + await self._client.__aenter__() + return self + + async def close(self): + """Close the credential's transport session.""" + + if self._client: + await self._client.__aexit__() + + async def get_token(self, *scopes, **kwargs): + # type: (*str, **Any) -> AccessToken + """Request an access token for `scopes`. + + .. note:: This method is called by Azure SDK clients. It isn't intended for use in application code. + + When this method is called, the credential will try to get the refresh token saved by VS Code. If a refresh + token can be found, it will redeem the refresh token for an access token and return the access token. + + :param str scopes: desired scopes for the access token. This method requires at least one scope. + :rtype: :class:`azure.core.credentials.AccessToken` + :raises ~azure.identity.CredentialUnavailableError: fail to get refresh token. + """ + if not scopes: + raise ValueError("'get_token' requires at least one scope") + + refresh_token = get_credentials() + if not refresh_token: + raise CredentialUnavailableError( + message="No Azure user is logged in to Visual Studio Code." + ) + token = self._client.get_cached_access_token(scopes) + if not token: + token = await self._client.obtain_token_by_refresh_token(refresh_token, scopes, **kwargs) + return token diff --git a/sdk/identity/azure-identity/tests/test_default.py b/sdk/identity/azure-identity/tests/test_default.py index 5f0c79ae9b1d..25c09cdff694 100644 --- a/sdk/identity/azure-identity/tests/test_default.py +++ b/sdk/identity/azure-identity/tests/test_default.py @@ -15,6 +15,7 @@ from azure.identity._constants import EnvironmentVariables from azure.identity._credentials.azure_cli import AzureCliCredential from azure.identity._credentials.managed_identity import ManagedIdentityCredential +from azure.identity._credentials.vscode_credential import VSCodeCredential import pytest from six.moves.urllib_parse import urlparse @@ -120,6 +121,9 @@ def assert_credentials_not_present(chain, *excluded_credential_classes): credential = DefaultAzureCredential(exclude_cli_credential=True) assert_credentials_not_present(credential, AzureCliCredential) + credential = DefaultAzureCredential(exclude_visual_studio_code_credential=True) + assert_credentials_not_present(credential, VSCodeCredential) + # interactive auth is excluded by default credential = DefaultAzureCredential(exclude_interactive_browser_credential=False) actual = {c.__class__ for c in credential.credentials} @@ -127,6 +131,7 @@ def assert_credentials_not_present(chain, *excluded_credential_classes): assert actual - default == {InteractiveBrowserCredential} + def test_shared_cache_tenant_id(): expected_access_token = "expected-access-token" refresh_token_a = "refresh-token-a" diff --git a/sdk/identity/azure-identity/tests/test_default_async.py b/sdk/identity/azure-identity/tests/test_default_async.py index 463aaa235256..c8eb0b17c4b2 100644 --- a/sdk/identity/azure-identity/tests/test_default_async.py +++ b/sdk/identity/azure-identity/tests/test_default_async.py @@ -12,6 +12,7 @@ from azure.identity.aio import DefaultAzureCredential, SharedTokenCacheCredential from azure.identity.aio._credentials.azure_cli import AzureCliCredential from azure.identity.aio._credentials.managed_identity import ManagedIdentityCredential +from azure.identity.aio._credentials.vscode_credential import VSCodeCredential from azure.identity._constants import EnvironmentVariables import pytest @@ -115,6 +116,9 @@ def assert_credentials_not_present(chain, *credential_classes): credential = DefaultAzureCredential(exclude_cli_credential=True) assert_credentials_not_present(credential, AzureCliCredential) + credential = DefaultAzureCredential(exclude_visual_studio_code_credential=True) + assert_credentials_not_present(credential, VSCodeCredential) + @pytest.mark.asyncio async def test_shared_cache_tenant_id(): diff --git a/sdk/identity/azure-identity/tests/test_vscode_credential.py b/sdk/identity/azure-identity/tests/test_vscode_credential.py new file mode 100644 index 000000000000..c4ea040358ff --- /dev/null +++ b/sdk/identity/azure-identity/tests/test_vscode_credential.py @@ -0,0 +1,90 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import sys +import pytest +from azure.core.credentials import AccessToken +from azure.identity import CredentialUnavailableError +from azure.core.pipeline.policies import SansIOHTTPPolicy +from azure.identity._internal.user_agent import USER_AGENT +from helpers import build_aad_response, mock_response, Request, validating_transport +try: + from unittest import mock +except ImportError: # python < 3.3 + import mock +from azure.identity._credentials.vscode_credential import VSCodeCredential, get_credentials + + +def test_no_scopes(): + """The credential should raise ValueError when get_token is called with no scopes""" + + credential = VSCodeCredential() + with pytest.raises(ValueError): + credential.get_token() + + +def test_policies_configurable(): + policy = mock.Mock(spec_set=SansIOHTTPPolicy, on_request=mock.Mock()) + + def send(*_, **__): + return mock_response(json_payload=build_aad_response(access_token="**")) + + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"): + credential = VSCodeCredential(policies=[policy], transport=mock.Mock(send=send)) + credential.get_token("scope") + assert policy.on_request.called + + +def test_user_agent(): + transport = validating_transport( + requests=[Request(required_headers={"User-Agent": USER_AGENT})], + responses=[mock_response(json_payload=build_aad_response(access_token="**"))], + ) + + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"): + credential = VSCodeCredential(transport=transport) + credential.get_token("scope") + + +def test_credential_unavailable_error(): + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value=None): + credential = VSCodeCredential() + with pytest.raises(CredentialUnavailableError): + token = credential.get_token("scope") + + +def test_redeem_token(): + expected_token = AccessToken("token", 42) + + mock_client = mock.Mock(spec=object) + mock_client.obtain_token_by_refresh_token = mock.Mock(return_value=expected_token) + mock_client.get_cached_access_token = mock.Mock(return_value=None) + + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"): + credential = VSCodeCredential(_client=mock_client) + token = credential.get_token("scope") + assert token is expected_token + mock_client.obtain_token_by_refresh_token.assert_called_with('VALUE', ('scope',)) + assert mock_client.obtain_token_by_refresh_token.call_count == 1 + + +@pytest.mark.skipif(not sys.platform.startswith('darwin'), reason="This test only runs on MacOS") +def test_mac_keychain_valid_value(): + with mock.patch('Keychain.get_generic_password', return_value="VALUE"): + assert get_credentials() == "VALUE" + + +@pytest.mark.skipif(not sys.platform.startswith('darwin'), reason="This test only runs on MacOS") +def test_mac_keychain_error(): + from msal_extensions.osx import Keychain, KeychainError + with mock.patch.object(Keychain, 'get_generic_password', side_effect=KeychainError()): + credential = VSCodeCredential() + with pytest.raises(CredentialUnavailableError): + token = credential.get_token("scope") + + +@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="This test only runs on Linux") +def test_get_token(): + with mock.patch('azure.identity._credentials.linux_vscode_adapter._get_refresh_token', return_value="VALUE"): + assert get_credentials() == "VALUE" diff --git a/sdk/identity/azure-identity/tests/test_vscode_credential_async.py b/sdk/identity/azure-identity/tests/test_vscode_credential_async.py new file mode 100644 index 000000000000..c466054edc9d --- /dev/null +++ b/sdk/identity/azure-identity/tests/test_vscode_credential_async.py @@ -0,0 +1,71 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import sys +import pytest +from azure.core.credentials import AccessToken +from azure.identity._internal.user_agent import USER_AGENT +from azure.identity import CredentialUnavailableError +from azure.core.pipeline.policies import SansIOHTTPPolicy +from helpers import build_aad_response, mock_response, Request +from helpers_async import async_validating_transport, AsyncMockTransport, wrap_in_future +from unittest import mock +from azure.identity.aio._credentials.vscode_credential import VSCodeCredential + + +@pytest.mark.asyncio +async def test_no_scopes(): + """The credential should raise ValueError when get_token is called with no scopes""" + + credential = VSCodeCredential() + with pytest.raises(ValueError): + await credential.get_token() + + +@pytest.mark.asyncio +async def test_policies_configurable(): + policy = mock.Mock(spec_set=SansIOHTTPPolicy, on_request=mock.Mock()) + + async def send(*_, **__): + return mock_response(json_payload=build_aad_response(access_token="**")) + + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"): + credential = VSCodeCredential(policies=[policy], transport=mock.Mock(send=send)) + await credential.get_token("scope") + assert policy.on_request.called + + +@pytest.mark.asyncio +async def test_user_agent(): + transport = async_validating_transport( + requests=[Request(required_headers={"User-Agent": USER_AGENT})], + responses=[mock_response(json_payload=build_aad_response(access_token="**"))], + ) + + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"): + credential = VSCodeCredential(transport=transport) + await credential.get_token("scope") + + +@pytest.mark.asyncio +async def test_credential_unavailable_error(): + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value=None): + credential = VSCodeCredential() + with pytest.raises(CredentialUnavailableError): + token = await credential.get_token("scope") + + +@pytest.mark.asyncio +async def test_redeem_token(): + expected_token = AccessToken("token", 42) + + mock_client = mock.Mock(spec=object) + token_by_refresh_token = mock.Mock(return_value=expected_token) + mock_client.obtain_token_by_refresh_token = wrap_in_future(token_by_refresh_token) + mock_client.get_cached_access_token = mock.Mock(return_value=None) + + with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"): + credential = VSCodeCredential(_client=mock_client) + token = await credential.get_token("scope") + assert token is expected_token