Skip to content

Commit

Permalink
Refactor test infrastructure to expose a known bug
Browse files Browse the repository at this point in the history
Apply the refactor to similar code path
  • Loading branch information
rayluo committed Sep 30, 2022
1 parent 28b45a3 commit 3d6e977
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 35 deletions.
5 changes: 4 additions & 1 deletion msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,7 @@ def _acquire_token_interactive_via_broker(
if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in
accounts = self.get_accounts(username=login_hint)
if len(accounts) == 1: # Unambiguously proceed with this account
logger.debug("Calling broker._acquire_token_silently()")
response = _acquire_token_silently( # When it works, it bypasses prompt
authority,
self.client_id,
Expand All @@ -1868,6 +1869,7 @@ def _acquire_token_interactive_via_broker(
return self._process_broker_response(response, scopes, data)
# login_hint undecisive or not exists
if prompt == "none" or not prompt: # Must/Can attempt _signin_silently()
logger.debug("Calling broker._signin_silently()")
response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint
authority, self.client_id, scopes,
validateAuthority=validate_authority,
Expand Down Expand Up @@ -1903,7 +1905,8 @@ def _acquire_token_interactive_via_broker(
pass # It will fall back to the _signin_interactively()
else:
return self._process_broker_response(response, scopes, data)
# Falls back to _signin_interactively()

logger.debug("Falls back to broker._signin_interactively()")
on_before_launching_ui(ui="broker")
response = _signin_interactively(
authority, self.client_id, scopes,
Expand Down
2 changes: 1 addition & 1 deletion msal/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _signin_silently(
def _signin_interactively(
authority, client_id, scopes,
parent_window_handle, # None means auto-detect for console apps
prompt=None,
prompt=None, # Note: This function does not really use this parameter
login_hint=None,
claims=None,
correlation_id=None,
Expand Down
113 changes: 80 additions & 33 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ def _get_app_and_auth_code(
assert ac is not None
return (app, ac, redirect_uri)

def _render(url, description=None):
# Render a url in html if description is available, otherwise return url as-is
return "<a href='{url}' target=_blank>{description}</a>".format(
url=url, description=description) if description else url


def _get_hint(html_mode=None, username=None, lab_name=None, username_uri=None):
return "Sign in with {user} whose password is available from {lab}".format(
user=("<b>{}</b>".format(username) if html_mode else username)
if username
else "the upn from {}".format(_render(
username_uri, description="here" if html_mode else None)),
lab=_render(
"https://aka.ms/GetLabUserSecret?Secret=" + (lab_name or "msidlabXYZ"),
description="this password api" if html_mode else None,
),
)


@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip e2e tests during tagged release")
class E2eTestCase(unittest.TestCase):

Expand Down Expand Up @@ -212,25 +231,31 @@ def _test_device_flow(

def _test_acquire_token_interactive(
self, client_id=None, authority=None, scope=None, port=None,
username_uri="", # But you would want to provide one
username=None, lab_name=None,
username_uri="", # Unnecessary if you provided username and lab_name
data=None, # Needed by ssh-cert feature
prompt=None,
enable_msa_passthrough=None,
**ignored):
assert client_id and authority and scope
self.app = self._build_app(client_id, authority=authority)
logger.info(_get_hint( # Useful when testing broker which shows no welcome_template
username=username, lab_name=lab_name, username_uri=username_uri))
result = self.app.acquire_token_interactive(
scope,
login_hint=username,
prompt=prompt,
timeout=120,
port=port,
parent_window_handle=self.app.CONSOLE_WINDOW_HANDLE, # This test app is a console app
enable_msa_passthrough=enable_msa_passthrough, # Needed when testing MSA-PT app
welcome_template= # This is an undocumented feature for testing
"""<html><body><h1>{id}</h1><ol>
<li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
<li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
(replace the lab name with the labName from the link above).</li>
<li>{hint}</li>
<li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
</ol></body></html>""".format(id=self.id(), username_uri=username_uri),
</ol></body></html>""".format(id=self.id(), hint=_get_hint(
html_mode=True,
username=username, lab_name=lab_name, username_uri=username_uri)),
data=data or {},
)
self.assertIn(
Expand All @@ -239,6 +264,11 @@ def _test_acquire_token_interactive(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
if username and result.get("id_token_claims", {}).get("preferred_username"):
self.assertEqual(
username, result["id_token_claims"]["preferred_username"],
"You are expected to sign in as account {}, but tokens returned is for {}".format(
username, result["id_token_claims"]["preferred_username"]))
self.assertCacheWorksForUser(result, scope, username=None, data=data or {})
return result # For further testing

Expand All @@ -260,7 +290,7 @@ def test_ssh_cert_for_service_principal(self):
self.assertEqual("ssh-cert", result["token_type"])

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_ssh_cert_for_user(self):
def test_ssh_cert_for_user_should_work_with_any_account(self):
result = self._test_acquire_token_interactive(
client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is one
# of the only 2 clients that are PreAuthz to use ssh cert feature
Expand Down Expand Up @@ -555,7 +585,8 @@ def _test_acquire_token_by_auth_code(

def _test_acquire_token_by_auth_code_flow(
self, client_id=None, authority=None, port=None, scope=None,
username_uri="", # But you would want to provide one
username=None, lab_name=None,
username_uri="", # Optional if you provided username and lab_name
**ignored):
assert client_id and authority and scope
self.app = msal.ClientApplication(
Expand All @@ -568,11 +599,11 @@ def _test_acquire_token_by_auth_code_flow(
auth_response = receiver.get_auth_response(
auth_uri=flow["auth_uri"], state=flow["state"], timeout=60,
welcome_template="""<html><body><h1>{id}</h1><ol>
<li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
<li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
(replace the lab name with the labName from the link above).</li>
<li>{hint}</li>
<li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
</ol></body></html>""".format(id=self.id(), username_uri=username_uri),
</ol></body></html>""".format(id=self.id(), hint=_get_hint(
html_mode=True,
username=username, lab_name=lab_name, username_uri=username_uri)),
)
if auth_response is None:
self.skipTest("Timed out. Did not have test settings in hand? Prepare and retry.")
Expand All @@ -592,6 +623,11 @@ def _test_acquire_token_by_auth_code_flow(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
if username and result.get("id_token_claims", {}).get("preferred_username"):
self.assertEqual(
username, result["id_token_claims"]["preferred_username"],
"You are expected to sign in as account {}, but tokens returned is for {}".format(
username, result["id_token_claims"]["preferred_username"]))
self.assertCacheWorksForUser(result, scope, username=None)

def _test_acquire_token_obo(self, config_pca, config_cca,
Expand Down Expand Up @@ -689,10 +725,23 @@ def test_adfs2019_fed_user(self):

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_cloud_acquire_token_interactive(self):
config = self.get_lab_user(usertype="cloud")
self._test_acquire_token_interactive(
username_uri="https://msidlab.com/api/user?usertype=cloud",
**config)
self._test_acquire_token_interactive(**self.get_lab_user(usertype="cloud"))

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_msa_pt_app_signin_via_organizations_authority_without_login_hint(self):
"""There is/was an upstream bug. See test case full docstring for the details.
When a MSAL-PT flow that account control is launched, user has 2+ AAD accounts in WAM,
selects an AAD account that is NOT the default AAD account from the OS,
it will incorrectly get tokens for default AAD account.
"""
self._test_acquire_token_interactive(**dict(
self.get_lab_user(usertype="cloud"), # This is generally not the current laptop's default AAD account
authority="https://login.microsoftonline.com/organizations",
client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is an MSA-PT app
enable_msa_passthrough=True,
prompt="select_account", # In MSAL Python, this resets login_hint
))

def test_ropc_adfs2019_onprem(self):
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
Expand All @@ -719,22 +768,22 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self):
@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_by_auth_code_flow(self):
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["scope"] = self.adfs2019_scopes
config["port"] = 8080
self._test_acquire_token_by_auth_code_flow(
username_uri="https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019",
**config)
self._test_acquire_token_by_auth_code_flow(**dict(
config,
authority="https://fs.%s.com/adfs" % config["lab_name"],
scope=self.adfs2019_scopes,
port=8080,
))

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_interactive(self):
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["scope"] = self.adfs2019_scopes
config["port"] = 8080
self._test_acquire_token_interactive(
username_uri="https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019",
**config)
self._test_acquire_token_interactive(**dict(
config,
authority="https://fs.%s.com/adfs" % config["lab_name"],
scope=self.adfs2019_scopes,
port=8080,
))

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
Expand Down Expand Up @@ -816,14 +865,12 @@ def test_b2c_acquire_token_by_auth_code(self):

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_b2c_acquire_token_by_auth_code_flow(self):
config = self.get_lab_app_object(azureenvironment="azureb2ccloud")
self._test_acquire_token_by_auth_code_flow(
self._test_acquire_token_by_auth_code_flow(**dict(
self.get_lab_user(usertype="b2c", b2cprovider="local"),
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
client_id=config["appId"],
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
scope=config["scopes"],
username_uri="https://msidlab.com/api/user?usertype=b2c&b2cprovider=local",
)
scope=self.get_lab_app_object(azureenvironment="azureb2ccloud")["scopes"],
))

def test_b2c_acquire_token_by_ropc(self):
config = self.get_lab_app_object(azureenvironment="azureb2ccloud")
Expand Down

0 comments on commit 3d6e977

Please sign in to comment.