Skip to content
4 changes: 4 additions & 0 deletions sdk/identity/azure-identity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 1.4.0b3 (Unreleased)

- Preview of `VSCodeCredential` #10472
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still calls out VSCodeCredential, the internal type no user should think about. The sentence below makes a better bullet. Also, please make the issue reference a link.


Now `DefaultAzureCredential` can authenticate with the identity signed in to Visual
Studio Code's Azure extension.

## 1.4.0b2 (2020-04-06)
- After an instance of `DefaultAzureCredential` successfully authenticates, it
Expand Down
1 change: 1 addition & 0 deletions sdk/identity/azure-identity/azure/identity/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AZURE_VSCODE_CLIENT_ID = "aebc6443-996d-45c2-90f0-388ff96faa56"
VSCODE_CREDENTIALS_SECTION = "VS Code Azure"


class KnownAuthorities:
AZURE_CHINA = "login.chinacloudapi.cn"
AZURE_GERMANY = "login.microsoftonline.de"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,38 @@


def _c_str(string):
return ct.c_char_p(string.encode('utf-8'))
return ct.c_char_p(string.encode("utf-8"))


try:
_libsecret = ct.cdll.LoadLibrary('libsecret-1.so.0')
_libsecret.secret_schema_new.argtypes = \
[ct.c_char_p, ct.c_uint, ct.c_char_p, ct.c_uint, ct.c_char_p, ct.c_uint, ct.c_void_p]
_libsecret.secret_password_lookup_sync.argtypes = \
[ct.c_void_p, ct.c_void_p, ct.c_void_p, ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_void_p]
_libsecret = ct.cdll.LoadLibrary("libsecret-1.so.0")
_libsecret.secret_schema_new.argtypes = [
ct.c_char_p,
ct.c_uint,
ct.c_char_p,
ct.c_uint,
ct.c_char_p,
ct.c_uint,
ct.c_void_p,
]
_libsecret.secret_password_lookup_sync.argtypes = [
ct.c_void_p,
ct.c_void_p,
ct.c_void_p,
ct.c_char_p,
ct.c_char_p,
ct.c_char_p,
ct.c_char_p,
ct.c_void_p,
]
_libsecret.secret_password_lookup_sync.restype = ct.c_char_p
_libsecret.secret_schema_unref.argtypes = [ct.c_void_p]
except OSError:
_libsecret = None


def _get_user_settings_path():
app_data_folder = os.environ['HOME']
app_data_folder = os.environ["HOME"]
return os.path.join(app_data_folder, ".config", "Code", "User", "settings.json")


Expand All @@ -47,17 +62,27 @@ def _get_refresh_token(service_name, account_name):
# _libsecret.secret_password_lookup_sync raises segment fault on Python 2.7
# temporarily disable it on 2.7
import sys

if sys.version_info[0] < 3:
raise NotImplementedError("Not supported on Python 2.7")

err = ct.c_int()
schema = _libsecret.secret_schema_new(_c_str("org.freedesktop.Secret.Generic"), 2,
_c_str("service"), 0, _c_str("account"), 0, None)
p_str = _libsecret.secret_password_lookup_sync(schema, None, ct.byref(err), _c_str("service"), _c_str(service_name),
_c_str("account"), _c_str(account_name), None)
schema = _libsecret.secret_schema_new(
_c_str("org.freedesktop.Secret.Generic"), 2, _c_str("service"), 0, _c_str("account"), 0, None
)
p_str = _libsecret.secret_password_lookup_sync(
schema,
None,
ct.byref(err),
_c_str("service"),
_c_str(service_name),
_c_str("account"),
_c_str(account_name),
None,
)
_libsecret.secret_schema_unref(schema)
if err.value == 0:
return p_str.decode('utf-8')
return p_str.decode("utf-8")

return None

Expand All @@ -69,5 +94,5 @@ def get_credentials():
return credentials
except NotImplementedError: # pylint:disable=try-except-raise
raise
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
return None
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def _get_user_settings_path():
app_data_folder = os.environ['USER']
app_data_folder = os.environ["USER"]
return os.path.join(app_data_folder, "Library", "Application Support", "Code", "User", "settings.json")


Expand Down Expand Up @@ -37,5 +37,5 @@ def get_credentials():
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
return None
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from .._exceptions import CredentialUnavailableError
from .._constants import AZURE_VSCODE_CLIENT_ID
from .._internal.aad_client import AadClient
if sys.platform.startswith('win'):

if sys.platform.startswith("win"):
from .win_vscode_adapter import get_credentials
elif sys.platform.startswith('darwin'):
elif sys.platform.startswith("darwin"):
from .macos_vscode_adapter import get_credentials
else:
from .linux_vscode_adapter import get_credentials
Expand All @@ -24,8 +25,10 @@ class VSCodeCredential(object):
"""Authenticates by redeeming a refresh token previously saved by VS Code

"""

def __init__(self, **kwargs):
self._client = kwargs.pop("_client", None) or AadClient("organizations", AZURE_VSCODE_CLIENT_ID, **kwargs)
self._refresh_token = None

def get_token(self, *scopes, **kwargs):
# type: (*str, **Any) -> AccessToken
Expand All @@ -43,11 +46,15 @@ def get_token(self, *scopes, **kwargs):
if not scopes:
raise ValueError("'get_token' requires at least one scope")

refresh_token = get_credentials()
if not refresh_token:
raise CredentialUnavailableError(
message="No Azure user is logged in to Visual Studio Code."
)
token = self._client.get_cached_access_token(scopes) \
or self._client.obtain_token_by_refresh_token(refresh_token, scopes, **kwargs)
token = self._client.get_cached_access_token(scopes)

if token:
return token

if not self._refresh_token:
self._refresh_token = get_credentials()
if not self._refresh_token:
raise CredentialUnavailableError(message="No Azure user is logged in to Visual Studio Code.")

token = self._client.obtain_token_by_refresh_token(self._refresh_token, scopes, **kwargs)
return token
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import json
import ctypes as ct
from .._constants import VSCODE_CREDENTIALS_SECTION

try:
import ctypes.wintypes as wt
except (IOError, ValueError):
pass


SUPPORTED_CREDKEYS = set((
'Type', 'TargetName', 'Persist',
'UserName', 'Comment', 'CredentialBlob'))
SUPPORTED_CREDKEYS = set(("Type", "TargetName", "Persist", "UserName", "Comment", "CredentialBlob"))

_PBYTE = ct.POINTER(ct.c_byte)


class _CREDENTIAL(ct.Structure):
Expand All @@ -25,17 +26,18 @@ class _CREDENTIAL(ct.Structure):
("Comment", ct.c_wchar_p),
("LastWritten", wt.FILETIME),
("CredentialBlobSize", wt.DWORD),
("CredentialBlob", wt.LPBYTE),
("CredentialBlob", _PBYTE),
("Persist", wt.DWORD),
("AttributeCount", wt.DWORD),
("Attributes", ct.c_void_p),
("TargetAlias", ct.c_wchar_p),
("UserName", ct.c_wchar_p)]
("UserName", ct.c_wchar_p),
]


_PCREDENTIAL = ct.POINTER(_CREDENTIAL)

_advapi = ct.WinDLL('advapi32')
_advapi = ct.WinDLL("advapi32")
_advapi.CredReadW.argtypes = [wt.LPCWSTR, wt.DWORD, wt.DWORD, ct.POINTER(_PCREDENTIAL)]
_advapi.CredReadW.restype = wt.BOOL
_advapi.CredFree.argtypes = [_PCREDENTIAL]
Expand All @@ -47,16 +49,15 @@ def _read_credential(service_name, account_name):
if _advapi.CredReadW(target, 1, 0, ct.byref(cred_ptr)):
cred_blob = cred_ptr.contents.CredentialBlob
cred_blob_size = cred_ptr.contents.CredentialBlobSize
password_as_list = [int.from_bytes(cred_blob[pos:pos + 1], 'little')
for pos in range(0, cred_blob_size)]
cred = ''.join(map(chr, password_as_list))
password_as_list = [int.from_bytes(cred_blob[pos : pos + 1], "little") for pos in range(0, cred_blob_size)]
cred = "".join(map(chr, password_as_list))
_advapi.CredFree(cred_ptr)
return cred
return None


def _get_user_settings_path():
app_data_folder = os.environ['APPDATA']
app_data_folder = os.environ["APPDATA"]
return os.path.join(app_data_folder, "Code", "User", "settings.json")


Expand All @@ -80,5 +81,5 @@ def get_credentials():
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
return None
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
from ..._constants import AZURE_VSCODE_CLIENT_ID
from .._internal.aad_client import AadClient
from ..._credentials.vscode_credential import get_credentials

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any, Iterable, Optional
from azure.core.credentials import AccessToken


class VSCodeCredential(AsyncCredentialBase):
"""Authenticates by redeeming a refresh token previously saved by VS Code

"""

def __init__(self, **kwargs):
self._client = kwargs.pop("_client", None) or AadClient("organizations", AZURE_VSCODE_CLIENT_ID, **kwargs)
self._refresh_token = None

async def __aenter__(self):
if self._client:
Expand Down Expand Up @@ -47,12 +51,14 @@ async def get_token(self, *scopes, **kwargs):
if not scopes:
raise ValueError("'get_token' requires at least one scope")

refresh_token = get_credentials()
if not refresh_token:
raise CredentialUnavailableError(
message="No Azure user is logged in to Visual Studio Code."
)
token = self._client.get_cached_access_token(scopes)
if not token:
token = await self._client.obtain_token_by_refresh_token(refresh_token, scopes, **kwargs)
if token:
return token

if not self._refresh_token:
self._refresh_token = get_credentials()
if not self._refresh_token:
raise CredentialUnavailableError(message="No Azure user is logged in to Visual Studio Code.")

token = await self._client.obtain_token_by_refresh_token(self._refresh_token, scopes, **kwargs)
return token
14 changes: 8 additions & 6 deletions sdk/identity/azure-identity/tests/test_vscode_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from azure.core.pipeline.policies import SansIOHTTPPolicy
from azure.identity._internal.user_agent import USER_AGENT
from helpers import build_aad_response, mock_response, Request, validating_transport

try:
from unittest import mock
except ImportError: # python < 3.3
Expand Down Expand Up @@ -65,26 +66,27 @@ def test_redeem_token():
credential = VSCodeCredential(_client=mock_client)
token = credential.get_token("scope")
assert token is expected_token
mock_client.obtain_token_by_refresh_token.assert_called_with('VALUE', ('scope',))
mock_client.obtain_token_by_refresh_token.assert_called_with("VALUE", ("scope",))
assert mock_client.obtain_token_by_refresh_token.call_count == 1


@pytest.mark.skipif(not sys.platform.startswith('darwin'), reason="This test only runs on MacOS")
@pytest.mark.skipif(not sys.platform.startswith("darwin"), reason="This test only runs on MacOS")
def test_mac_keychain_valid_value():
with mock.patch('Keychain.get_generic_password', return_value="VALUE"):
with mock.patch("msal_extensions.osx.Keychain.get_generic_password", return_value="VALUE"):
assert get_credentials() == "VALUE"


@pytest.mark.skipif(not sys.platform.startswith('darwin'), reason="This test only runs on MacOS")
@pytest.mark.skipif(not sys.platform.startswith("darwin"), reason="This test only runs on MacOS")
def test_mac_keychain_error():
from msal_extensions.osx import Keychain, KeychainError
with mock.patch.object(Keychain, 'get_generic_password', side_effect=KeychainError()):

with mock.patch.object(Keychain, "get_generic_password", side_effect=KeychainError(-1)):
credential = VSCodeCredential()
with pytest.raises(CredentialUnavailableError):
token = credential.get_token("scope")


@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="This test only runs on Linux")
def test_get_token():
with mock.patch('azure.identity._credentials.linux_vscode_adapter._get_refresh_token', return_value="VALUE"):
with mock.patch("azure.identity._credentials.linux_vscode_adapter._get_refresh_token", return_value="VALUE"):
assert get_credentials() == "VALUE"
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ async def test_redeem_token():
credential = VSCodeCredential(_client=mock_client)
token = await credential.get_token("scope")
assert token is expected_token
token_by_refresh_token.assert_called_with("VALUE", ("scope",))