diff --git a/sdk/identity/azure-identity/conftest.py b/sdk/identity/azure-identity/conftest.py index d2e94c1e9cbd..ed71382fbb00 100644 --- a/sdk/identity/azure-identity/conftest.py +++ b/sdk/identity/azure-identity/conftest.py @@ -6,18 +6,40 @@ import sys import pytest -from azure.identity._constants import EnvironmentVariables +from azure.identity._constants import AZURE_CLI_CLIENT_ID, EnvironmentVariables -# IMDS tests must be run explicitly -collect_ignore_glob = ["*imds*"] # pylint:disable=invalid-name - -# Ignore collection of async tests on unsupported platforms +# Ignore async tests on unsupported platforms if sys.version_info < (3, 5): - collect_ignore_glob.append("*_async.py") + collect_ignore_glob = ["*_async.py"] + + +def pytest_addoption(parser): + parser.addoption("--manual", action="store_true", default=False, help="run manual tests") + + +def pytest_configure(config): + config.addinivalue_line("markers", "manual: mark test as requiring manual interaction") + config.addinivalue_line("markers", "prints: mark test as printing important information to stdout") + + +def pytest_collection_modifyitems(config, items): + stdout_captured = config.getoption("capture") != "no" + run_manual_tests = config.getoption("--manual") + if not stdout_captured and run_manual_tests: + return + + # skip manual tests or tests which print to stdout, as appropriate + skip_manual = pytest.mark.skip(reason="run pytest with '--manual' to run manual tests") + skip_prints = pytest.mark.skip(reason="this test prints to stdout, run pytest with '-s' to make output visible") + for test in items: + if not run_manual_tests and "manual" in test.keywords: + test.add_marker(skip_manual) + elif stdout_captured and "prints" in test.keywords: + test.add_marker(skip_prints) @pytest.fixture() -def live_identity_settings(): # pylint:disable=inconsistent-return-statements +def live_service_principal(): # pylint:disable=inconsistent-return-statements """Fixture for live Identity tests. Skips them when environment configuration is incomplete.""" missing_variables = [ @@ -40,7 +62,7 @@ def live_identity_settings(): # pylint:disable=inconsistent-return-statements @pytest.fixture() -def live_certificate_settings(live_identity_settings): # pylint:disable=inconsistent-return-statements,redefined-outer-name +def live_certificate(live_service_principal): # pylint:disable=inconsistent-return-statements,redefined-outer-name """Fixture for live tests needing a certificate. Skips them when environment configuration is incomplete. """ @@ -54,6 +76,25 @@ def live_certificate_settings(live_identity_settings): # pylint:disable=inconsi try: with open(pem_path, "w") as pem_file: pem_file.write(pem_content) - return dict(live_identity_settings, cert_path=pem_path) + return dict(live_service_principal, cert_path=pem_path) except IOError as ex: pytest.skip("Failed to write file '{}': {}".format(pem_path, ex)) + + +@pytest.fixture() +def live_user_details(): + user_details = { + "client_id": AZURE_CLI_CLIENT_ID, + "username": os.environ.get(EnvironmentVariables.AZURE_USERNAME), + "password": os.environ.get(EnvironmentVariables.AZURE_PASSWORD), + "tenant": os.environ.get("USER_TENANT"), + } + if None in user_details.values(): + pytest.skip("To test username/password authentication, set $AZURE_USERNAME, $AZURE_PASSWORD, $USER_TENANT") + else: + return user_details + + +@pytest.fixture() +def managed_identity_id(): + return os.environ.get("MANAGED_IDENTITY_ID") diff --git a/sdk/identity/azure-identity/tests/test_imds.py b/sdk/identity/azure-identity/tests/test_imds.py deleted file mode 100644 index df1eb937713d..000000000000 --- a/sdk/identity/azure-identity/tests/test_imds.py +++ /dev/null @@ -1,11 +0,0 @@ -# ------------------------------------ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -# ------------------------------------ -from azure.identity._internal import ImdsCredential - - -def test_imds_credential(): - credential = ImdsCredential() - token = credential.get_token("https://management.azure.com/.default") - assert token diff --git a/sdk/identity/azure-identity/tests/test_imds_async.py b/sdk/identity/azure-identity/tests/test_imds_async.py deleted file mode 100644 index 2f8466f64565..000000000000 --- a/sdk/identity/azure-identity/tests/test_imds_async.py +++ /dev/null @@ -1,14 +0,0 @@ -# ------------------------------------ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -# ------------------------------------ -import pytest - -from azure.identity.aio._internal import ImdsCredential - - -@pytest.mark.asyncio -async def test_imds_credential_async(): - credential = ImdsCredential() - token = await credential.get_token("https://management.azure.com/.default") - assert token diff --git a/sdk/identity/azure-identity/tests/test_live.py b/sdk/identity/azure-identity/tests/test_live.py index e5cf87c0c8a3..63548951ee5f 100644 --- a/sdk/identity/azure-identity/tests/test_live.py +++ b/sdk/identity/azure-identity/tests/test_live.py @@ -2,52 +2,116 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ -from azure.identity import DefaultAzureCredential, CertificateCredential, ClientSecretCredential, KnownAuthorities +import os + +import pytest + +from azure.identity import ( + DefaultAzureCredential, + CertificateCredential, + ClientSecretCredential, + DeviceCodeCredential, + KnownAuthorities, + InteractiveBrowserCredential, + ManagedIdentityCredential, + UsernamePasswordCredential, +) +from azure.identity._constants import AZURE_CLI_CLIENT_ID, EnvironmentVariables +from azure.identity._credentials.managed_identity import ImdsCredential, MsiCredential from azure.identity._internal import ConfidentialClientCredential ARM_SCOPE = "https://management.azure.com/.default" -def test_certificate_credential(live_certificate_settings): - credential = CertificateCredential( - live_certificate_settings["tenant_id"], - live_certificate_settings["client_id"], - live_certificate_settings["cert_path"], - ) +def get_token(credential): token = credential.get_token(ARM_SCOPE) assert token assert token.token assert token.expires_on -def test_client_secret_credential(live_identity_settings): +def test_certificate_credential(live_certificate): + credential = CertificateCredential( + live_certificate["tenant_id"], live_certificate["client_id"], live_certificate["cert_path"] + ) + get_token(credential) + + +def test_client_secret_credential(live_service_principal): credential = ClientSecretCredential( - live_identity_settings["tenant_id"], - live_identity_settings["client_id"], - live_identity_settings["client_secret"], + live_service_principal["tenant_id"], + live_service_principal["client_id"], + live_service_principal["client_secret"], ) - token = credential.get_token(ARM_SCOPE) - assert token - assert token.token - assert token.expires_on + get_token(credential) -def test_default_credential(live_identity_settings): +def test_default_credential(live_service_principal): credential = DefaultAzureCredential() - token = credential.get_token(ARM_SCOPE) - assert token - assert token.token - assert token.expires_on + get_token(credential) -def test_confidential_client_credential(live_identity_settings): +def test_confidential_client_credential(live_service_principal): credential = ConfidentialClientCredential( - client_id=live_identity_settings["client_id"], - client_credential=live_identity_settings["client_secret"], + client_id=live_service_principal["client_id"], + client_credential=live_service_principal["client_secret"], authority=KnownAuthorities.AZURE_PUBLIC_CLOUD, - tenant_id=live_identity_settings["tenant_id"], + tenant_id=live_service_principal["tenant_id"], ) - token = credential.get_token(ARM_SCOPE) - assert token - assert token.token - assert token.expires_on + get_token(credential) + + +@pytest.mark.skipif("TEST_IMDS" not in os.environ, reason="To test IMDS authentication, set $TEST_IMDS with any value") +def test_imds_credential(managed_identity_id): + get_token(ImdsCredential()) + if managed_identity_id: + get_token(ImdsCredential(client_id=managed_identity_id)) + + +@pytest.mark.skipif( + EnvironmentVariables.MSI_ENDPOINT not in os.environ or EnvironmentVariables.MSI_SECRET in os.environ, + reason="Legacy MSI unavailable", +) +def test_msi_legacy(managed_identity_id): + get_token(MsiCredential()) + if managed_identity_id: + get_token(ImdsCredential(client_id=managed_identity_id)) + + +@pytest.mark.skipif( + EnvironmentVariables.MSI_ENDPOINT not in os.environ or EnvironmentVariables.MSI_SECRET not in os.environ, + reason="App Service MSI unavailable", +) +def test_msi_app_service(managed_identity_id): + get_token(MsiCredential()) + if managed_identity_id: + get_token(ImdsCredential(client_id=managed_identity_id)) + + +def test_username_password_auth(live_user_details): + credential = UsernamePasswordCredential( + client_id=live_user_details["client_id"], + username=live_user_details["username"], + password=live_user_details["password"], + tenant_id=live_user_details["tenant"], + ) + get_token(credential) + + +@pytest.mark.manual +@pytest.mark.prints +def test_device_code(): + import webbrowser + + def prompt(url, user_code, _): + print("opening a browser to '{}', enter device code {}".format(url, user_code)) + webbrowser.open_new_tab(url) + + credential = DeviceCodeCredential(client_id=AZURE_CLI_CLIENT_ID, prompt_callback=prompt, timeout=40) + get_token(credential) + + +@pytest.mark.manual +def test_browser_auth(): + credential = InteractiveBrowserCredential(client_id=AZURE_CLI_CLIENT_ID, timeout=40) + get_token(credential) diff --git a/sdk/identity/azure-identity/tests/test_live_async.py b/sdk/identity/azure-identity/tests/test_live_async.py index 9ede517c6a3f..13b26a721734 100644 --- a/sdk/identity/azure-identity/tests/test_live_async.py +++ b/sdk/identity/azure-identity/tests/test_live_async.py @@ -4,24 +4,16 @@ # ------------------------------------ import os -try: - from unittest import mock -except ImportError: # python < 3.3 - import mock # type: ignore +import pytest +from azure.identity._constants import EnvironmentVariables from azure.identity.aio import DefaultAzureCredential, CertificateCredential, ClientSecretCredential -import pytest +from azure.identity.aio._credentials.managed_identity import ImdsCredential, MsiCredential ARM_SCOPE = "https://management.azure.com/.default" -@pytest.mark.asyncio -async def test_certificate_credential(live_certificate_settings): - credential = CertificateCredential( - live_certificate_settings["tenant_id"], - live_certificate_settings["client_id"], - live_certificate_settings["cert_path"], - ) +async def get_token(credential): token = await credential.get_token(ARM_SCOPE) assert token assert token.token @@ -29,22 +21,48 @@ async def test_certificate_credential(live_certificate_settings): @pytest.mark.asyncio -async def test_client_secret_credential(live_identity_settings): +async def test_certificate_credential(live_certificate): + credential = CertificateCredential( + live_certificate["tenant_id"], live_certificate["client_id"], live_certificate["cert_path"] + ) + await get_token(credential) + + +@pytest.mark.asyncio +async def test_client_secret_credential(live_service_principal): credential = ClientSecretCredential( - live_identity_settings["tenant_id"], - live_identity_settings["client_id"], - live_identity_settings["client_secret"], + live_service_principal["tenant_id"], + live_service_principal["client_id"], + live_service_principal["client_secret"], ) - token = await credential.get_token(ARM_SCOPE) - assert token - assert token.token - assert token.expires_on + await get_token(credential) @pytest.mark.asyncio -async def test_default_credential(live_identity_settings): +async def test_default_credential(live_service_principal): credential = DefaultAzureCredential() - token = await credential.get_token(ARM_SCOPE) - assert token - assert token.token - assert token.expires_on + await get_token(credential) + + +@pytest.mark.skipif("TEST_IMDS" not in os.environ, reason="To test IMDS authentication, set $TEST_IMDS with any value") +@pytest.mark.asyncio +async def test_imds_credential(): + await get_token(ImdsCredential()) + + +@pytest.mark.skipif( + EnvironmentVariables.MSI_ENDPOINT not in os.environ or EnvironmentVariables.MSI_SECRET in os.environ, + reason="Legacy MSI unavailable", +) +@pytest.mark.asyncio +async def test_msi_legacy(): + await get_token(MsiCredential()) + + +@pytest.mark.skipif( + EnvironmentVariables.MSI_ENDPOINT not in os.environ or EnvironmentVariables.MSI_SECRET not in os.environ, + reason="App Service MSI unavailable", +) +@pytest.mark.asyncio +async def test_msi_app_service(): + await get_token(MsiCredential())