Skip to content
2 changes: 2 additions & 0 deletions sdk/identity/azure-identity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 1.4.0b3 (Unreleased)

- Now `DefaultAzureCredential` can authenticate with the identity signed in to Visual
Studio Code's Azure extension. ([#10472](https://github.com/Azure/azure-sdk-for-python/issues/10472))

## 1.4.0b2 (2020-04-06)
- After an instance of `DefaultAzureCredential` successfully authenticates, it
Expand Down
1 change: 1 addition & 0 deletions sdk/identity/azure-identity/azure/identity/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AZURE_VSCODE_CLIENT_ID = "aebc6443-996d-45c2-90f0-388ff96faa56"
VSCODE_CREDENTIALS_SECTION = "VS Code Azure"


class KnownAuthorities:
AZURE_CHINA = "login.chinacloudapi.cn"
AZURE_GERMANY = "login.microsoftonline.de"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,38 @@


def _c_str(string):
return ct.c_char_p(string.encode('utf-8'))
return ct.c_char_p(string.encode("utf-8"))


try:
_libsecret = ct.cdll.LoadLibrary('libsecret-1.so.0')
_libsecret.secret_schema_new.argtypes = \
[ct.c_char_p, ct.c_uint, ct.c_char_p, ct.c_uint, ct.c_char_p, ct.c_uint, ct.c_void_p]
_libsecret.secret_password_lookup_sync.argtypes = \
[ct.c_void_p, ct.c_void_p, ct.c_void_p, ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_char_p, ct.c_void_p]
_libsecret = ct.cdll.LoadLibrary("libsecret-1.so.0")
_libsecret.secret_schema_new.argtypes = [
ct.c_char_p,
ct.c_uint,
ct.c_char_p,
ct.c_uint,
ct.c_char_p,
ct.c_uint,
ct.c_void_p,
]
_libsecret.secret_password_lookup_sync.argtypes = [
ct.c_void_p,
ct.c_void_p,
ct.c_void_p,
ct.c_char_p,
ct.c_char_p,
ct.c_char_p,
ct.c_char_p,
ct.c_void_p,
]
_libsecret.secret_password_lookup_sync.restype = ct.c_char_p
_libsecret.secret_schema_unref.argtypes = [ct.c_void_p]
except OSError:
_libsecret = None


def _get_user_settings_path():
app_data_folder = os.environ['HOME']
app_data_folder = os.environ["HOME"]
return os.path.join(app_data_folder, ".config", "Code", "User", "settings.json")


Expand All @@ -47,17 +62,27 @@ def _get_refresh_token(service_name, account_name):
# _libsecret.secret_password_lookup_sync raises segment fault on Python 2.7
# temporarily disable it on 2.7
import sys

if sys.version_info[0] < 3:
raise NotImplementedError("Not supported on Python 2.7")

err = ct.c_int()
schema = _libsecret.secret_schema_new(_c_str("org.freedesktop.Secret.Generic"), 2,
_c_str("service"), 0, _c_str("account"), 0, None)
p_str = _libsecret.secret_password_lookup_sync(schema, None, ct.byref(err), _c_str("service"), _c_str(service_name),
_c_str("account"), _c_str(account_name), None)
schema = _libsecret.secret_schema_new(
_c_str("org.freedesktop.Secret.Generic"), 2, _c_str("service"), 0, _c_str("account"), 0, None
)
p_str = _libsecret.secret_password_lookup_sync(
schema,
None,
ct.byref(err),
_c_str("service"),
_c_str(service_name),
_c_str("account"),
_c_str(account_name),
None,
)
_libsecret.secret_schema_unref(schema)
if err.value == 0:
return p_str.decode('utf-8')
return p_str.decode("utf-8")

return None

Expand All @@ -69,5 +94,5 @@ def get_credentials():
return credentials
except NotImplementedError: # pylint:disable=try-except-raise
raise
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
return None
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def _get_user_settings_path():
app_data_folder = os.environ['USER']
app_data_folder = os.environ["USER"]
return os.path.join(app_data_folder, "Library", "Application Support", "Code", "User", "settings.json")


Expand Down Expand Up @@ -37,5 +37,5 @@ def get_credentials():
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
return None
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from .._exceptions import CredentialUnavailableError
from .._constants import AZURE_VSCODE_CLIENT_ID
from .._internal.aad_client import AadClient
if sys.platform.startswith('win'):

if sys.platform.startswith("win"):
from .win_vscode_adapter import get_credentials
elif sys.platform.startswith('darwin'):
elif sys.platform.startswith("darwin"):
from .macos_vscode_adapter import get_credentials
else:
from .linux_vscode_adapter import get_credentials
Expand All @@ -24,8 +25,10 @@ class VSCodeCredential(object):
"""Authenticates by redeeming a refresh token previously saved by VS Code

"""

def __init__(self, **kwargs):
self._client = kwargs.pop("_client", None) or AadClient("organizations", AZURE_VSCODE_CLIENT_ID, **kwargs)
self._refresh_token = None

def get_token(self, *scopes, **kwargs):
# type: (*str, **Any) -> AccessToken
Expand All @@ -43,11 +46,15 @@ def get_token(self, *scopes, **kwargs):
if not scopes:
raise ValueError("'get_token' requires at least one scope")

refresh_token = get_credentials()
if not refresh_token:
raise CredentialUnavailableError(
message="No Azure user is logged in to Visual Studio Code."
)
token = self._client.get_cached_access_token(scopes) \
or self._client.obtain_token_by_refresh_token(refresh_token, scopes, **kwargs)
token = self._client.get_cached_access_token(scopes)

if token:
return token

if not self._refresh_token:
self._refresh_token = get_credentials()
if not self._refresh_token:
raise CredentialUnavailableError(message="No Azure user is logged in to Visual Studio Code.")

token = self._client.obtain_token_by_refresh_token(self._refresh_token, scopes, **kwargs)
return token
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import json
import ctypes as ct
from .._constants import VSCODE_CREDENTIALS_SECTION

try:
import ctypes.wintypes as wt
except (IOError, ValueError):
pass


SUPPORTED_CREDKEYS = set((
'Type', 'TargetName', 'Persist',
'UserName', 'Comment', 'CredentialBlob'))
SUPPORTED_CREDKEYS = set(("Type", "TargetName", "Persist", "UserName", "Comment", "CredentialBlob"))

_PBYTE = ct.POINTER(ct.c_byte)


class _CREDENTIAL(ct.Structure):
Expand All @@ -25,17 +26,18 @@ class _CREDENTIAL(ct.Structure):
("Comment", ct.c_wchar_p),
("LastWritten", wt.FILETIME),
("CredentialBlobSize", wt.DWORD),
("CredentialBlob", wt.LPBYTE),
("CredentialBlob", _PBYTE),
("Persist", wt.DWORD),
("AttributeCount", wt.DWORD),
("Attributes", ct.c_void_p),
("TargetAlias", ct.c_wchar_p),
("UserName", ct.c_wchar_p)]
("UserName", ct.c_wchar_p),
]


_PCREDENTIAL = ct.POINTER(_CREDENTIAL)

_advapi = ct.WinDLL('advapi32')
_advapi = ct.WinDLL("advapi32")
_advapi.CredReadW.argtypes = [wt.LPCWSTR, wt.DWORD, wt.DWORD, ct.POINTER(_PCREDENTIAL)]
_advapi.CredReadW.restype = wt.BOOL
_advapi.CredFree.argtypes = [_PCREDENTIAL]
Expand All @@ -47,16 +49,15 @@ def _read_credential(service_name, account_name):
if _advapi.CredReadW(target, 1, 0, ct.byref(cred_ptr)):
cred_blob = cred_ptr.contents.CredentialBlob
cred_blob_size = cred_ptr.contents.CredentialBlobSize
password_as_list = [int.from_bytes(cred_blob[pos:pos + 1], 'little')
for pos in range(0, cred_blob_size)]
cred = ''.join(map(chr, password_as_list))
password_as_list = [int.from_bytes(cred_blob[pos : pos + 1], "little") for pos in range(0, cred_blob_size)]
cred = "".join(map(chr, password_as_list))
_advapi.CredFree(cred_ptr)
return cred
return None


def _get_user_settings_path():
app_data_folder = os.environ['APPDATA']
app_data_folder = os.environ["APPDATA"]
return os.path.join(app_data_folder, "Code", "User", "settings.json")


Expand All @@ -80,5 +81,5 @@ def get_credentials():
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
return None
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
from ..._constants import AZURE_VSCODE_CLIENT_ID
from .._internal.aad_client import AadClient
from ..._credentials.vscode_credential import get_credentials

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any, Iterable, Optional
from azure.core.credentials import AccessToken


class VSCodeCredential(AsyncCredentialBase):
"""Authenticates by redeeming a refresh token previously saved by VS Code

"""

def __init__(self, **kwargs):
self._client = kwargs.pop("_client", None) or AadClient("organizations", AZURE_VSCODE_CLIENT_ID, **kwargs)
self._refresh_token = None

async def __aenter__(self):
if self._client:
Expand Down Expand Up @@ -47,12 +51,14 @@ async def get_token(self, *scopes, **kwargs):
if not scopes:
raise ValueError("'get_token' requires at least one scope")

refresh_token = get_credentials()
if not refresh_token:
raise CredentialUnavailableError(
message="No Azure user is logged in to Visual Studio Code."
)
token = self._client.get_cached_access_token(scopes)
if not token:
token = await self._client.obtain_token_by_refresh_token(refresh_token, scopes, **kwargs)
if token:
return token

if not self._refresh_token:
self._refresh_token = get_credentials()
if not self._refresh_token:
raise CredentialUnavailableError(message="No Azure user is logged in to Visual Studio Code.")

token = await self._client.obtain_token_by_refresh_token(self._refresh_token, scopes, **kwargs)
return token
45 changes: 39 additions & 6 deletions sdk/identity/azure-identity/tests/test_vscode_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from azure.core.pipeline.policies import SansIOHTTPPolicy
from azure.identity._internal.user_agent import USER_AGENT
from helpers import build_aad_response, mock_response, Request, validating_transport

try:
from unittest import mock
except ImportError: # python < 3.3
Expand Down Expand Up @@ -65,26 +66,58 @@ def test_redeem_token():
credential = VSCodeCredential(_client=mock_client)
token = credential.get_token("scope")
assert token is expected_token
mock_client.obtain_token_by_refresh_token.assert_called_with('VALUE', ('scope',))
mock_client.obtain_token_by_refresh_token.assert_called_with("VALUE", ("scope",))
assert mock_client.obtain_token_by_refresh_token.call_count == 1


@pytest.mark.skipif(not sys.platform.startswith('darwin'), reason="This test only runs on MacOS")
def test_cache_refresh_token():
expected_token = AccessToken("token", 42)

mock_client = mock.Mock(spec=object)
mock_client.obtain_token_by_refresh_token = mock.Mock(return_value=expected_token)
mock_client.get_cached_access_token = mock.Mock(return_value=None)
mock_get_credentials = mock.Mock(return_value="VALUE")

with mock.patch(VSCodeCredential.__module__ + ".get_credentials", mock_get_credentials):
credential = VSCodeCredential(_client=mock_client)
token = credential.get_token("scope")
assert token is expected_token
assert mock_get_credentials.call_count == 1
token = credential.get_token("scope")
assert token is expected_token
assert mock_get_credentials.call_count == 1


def test_no_obtain_token_if_cached():
expected_token = AccessToken("token", 42)

mock_client = mock.Mock(spec=object)
mock_client.obtain_token_by_refresh_token = mock.Mock(return_value=expected_token)
mock_client.get_cached_access_token = mock.Mock(return_value='VALUE')

with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"):
credential = VSCodeCredential(_client=mock_client)
token = credential.get_token("scope")
assert mock_client.obtain_token_by_refresh_token.call_count == 0


@pytest.mark.skipif(not sys.platform.startswith("darwin"), reason="This test only runs on MacOS")
def test_mac_keychain_valid_value():
with mock.patch('Keychain.get_generic_password', return_value="VALUE"):
with mock.patch("msal_extensions.osx.Keychain.get_generic_password", return_value="VALUE"):
assert get_credentials() == "VALUE"


@pytest.mark.skipif(not sys.platform.startswith('darwin'), reason="This test only runs on MacOS")
@pytest.mark.skipif(not sys.platform.startswith("darwin"), reason="This test only runs on MacOS")
def test_mac_keychain_error():
from msal_extensions.osx import Keychain, KeychainError
with mock.patch.object(Keychain, 'get_generic_password', side_effect=KeychainError()):

with mock.patch.object(Keychain, "get_generic_password", side_effect=KeychainError(-1)):
credential = VSCodeCredential()
with pytest.raises(CredentialUnavailableError):
token = credential.get_token("scope")


@pytest.mark.skipif(not sys.platform.startswith("linux"), reason="This test only runs on Linux")
def test_get_token():
with mock.patch('azure.identity._credentials.linux_vscode_adapter._get_refresh_token', return_value="VALUE"):
with mock.patch("azure.identity._credentials.linux_vscode_adapter._get_refresh_token", return_value="VALUE"):
assert get_credentials() == "VALUE"
33 changes: 33 additions & 0 deletions sdk/identity/azure-identity/tests/test_vscode_credential_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,36 @@ async def test_redeem_token():
credential = VSCodeCredential(_client=mock_client)
token = await credential.get_token("scope")
assert token is expected_token
token_by_refresh_token.assert_called_with("VALUE", ("scope",))

@pytest.mark.asyncio
async def test_cache_refresh_token():
expected_token = AccessToken("token", 42)

mock_client = mock.Mock(spec=object)
token_by_refresh_token = mock.Mock(return_value=expected_token)
mock_client.obtain_token_by_refresh_token = wrap_in_future(token_by_refresh_token)
mock_client.get_cached_access_token = mock.Mock(return_value=None)
mock_get_credentials = mock.Mock(return_value="VALUE")

with mock.patch(VSCodeCredential.__module__ + ".get_credentials", mock_get_credentials):
credential = VSCodeCredential(_client=mock_client)
token = await credential.get_token("scope")
assert mock_get_credentials.call_count == 1
token = await credential.get_token("scope")
assert mock_get_credentials.call_count == 1


@pytest.mark.asyncio
async def test_no_obtain_token_if_cached():
expected_token = AccessToken("token", 42)

mock_client = mock.Mock(spec=object)
token_by_refresh_token = mock.Mock(return_value=expected_token)
mock_client.obtain_token_by_refresh_token = wrap_in_future(token_by_refresh_token)
mock_client.get_cached_access_token = mock.Mock(return_value='VALUE')

with mock.patch(VSCodeCredential.__module__ + ".get_credentials", return_value="VALUE"):
credential = VSCodeCredential(_client=mock_client)
token = await credential.get_token("scope")
assert token_by_refresh_token.call_count == 0