Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def decorate_scope(
return list(decorated)



class ClientApplication(object):

def __init__(
Expand All @@ -59,7 +60,7 @@ def __init__(
verify=True, proxies=None, timeout=None):
"""Create an instance of application.

:param client_id: Your app has a clinet_id after you register it on AAD.
:param client_id: Your app has a client_id after you register it on AAD.
:param client_credential:
For :class:`PublicClientApplication`, you simply use `None` here.
For :class:`ConfidentialClientApplication`,
Expand All @@ -69,8 +70,11 @@ def __init__(
{
"private_key": "...-----BEGIN PRIVATE KEY-----...",
"thumbprint": "A1B2C3D4E5F6...",
"public_certificate": "...-----BEGIN CERTIFICATE-----..." (Only to be sent when using Subject Name Issuer Authentication)
Comment thread
rayluo marked this conversation as resolved.
Outdated
}

public_certificate (optional) can be a public key certificate or certificate chain which is sent through
Comment thread
rayluo marked this conversation as resolved.
Outdated
'x5c' JWT header only for subject name and issuer based authentication to support cert auto rolls
:param str authority:
A URL that identifies a token authority. It should be of the format
https://login.microsoftonline.com/your_tenant
Expand Down Expand Up @@ -106,16 +110,27 @@ def __init__(
self.client = self._build_client(client_credential, self.authority)
self.authority_groups = None

def _extract_x5c_value(self):
Comment thread
rayluo marked this conversation as resolved.
Outdated
public_certificates = re.findall(
r'\-+BEGIN CERTIFICATE.+\-+(?P<cert_value>[^-]+)\-+END CERTIFICATE.+\-+',
self.client_credential['public_certificate'], re.I) # We send x5c as list of strings
if len(public_certificates):
return [cert.strip() for cert in public_certificates]
return [self.client_credential['public_certificate'].strip()]

def _build_client(self, client_credential, authority):
client_assertion = None
client_assertion_type = None
default_body = {"client_info": 1}
headers = {}
Comment thread
rayluo marked this conversation as resolved.
Outdated
if isinstance(client_credential, dict):
assert ("private_key" in client_credential
and "thumbprint" in client_credential)
if 'public_certificate' in client_credential:
headers["x5c"] = self._extract_x5c_value()
signer = JwtSigner(
client_credential["private_key"], algorithm="RS256",
sha1_thumbprint=client_credential.get("thumbprint"))
sha1_thumbprint=client_credential.get("thumbprint"), headers=headers)
client_assertion = signer.sign_assertion(
audience=authority.token_endpoint, issuer=self.client_id)
client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
Expand Down
70 changes: 70 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,76 @@ def test_client_certificate(self):
self.assertIn('access_token', result)
self.assertCacheWorks(result, app.acquire_token_silent(scope, account=None))

@unittest.skipUnless("public_certificate" in CONFIG, "Missing Public cert")
def test_subject_name_issuer_authentication_input_with_begin_certificate_and_end_certificate_single(self):
assert ("private_key_file" in CONFIG
and "thumbprint" in CONFIG and "public_certificate" in CONFIG)
key_path = os.path.join(THIS_FOLDER, CONFIG['private_key_file'])
public_certificate = os.path.join(THIS_FOLDER, CONFIG['public_certificate'])
with open(key_path) as f:
pem = f.read()
with open(public_certificate) as f:
public_certificate = f.read()
app = ConfidentialClientApplication(
CONFIG['client_id'],
client_credential={"private_key": pem, "thumbprint": CONFIG["thumbprint"],
"public_certificate": public_certificate})
x5c = app._extract_x5c_value()
self.assertIsInstance(x5c, list)
self.assertEquals(len(x5c), 1)

@unittest.skipUnless("public_certificate" in CONFIG, "Missing Public cert")
def test_subject_name_issuer_authentication_input_with_begin_certificate_and_end_certificate_multiple(self):
assert ("private_key_file" in CONFIG
and "thumbprint" in CONFIG and "public_certificate" in CONFIG)
key_path = os.path.join(THIS_FOLDER, CONFIG['private_key_file'])
public_certificate = os.path.join(THIS_FOLDER, CONFIG['public_certificate'])
with open(key_path) as f:
pem = f.read()
with open(public_certificate) as f:
public_certificate = f.read()
app = ConfidentialClientApplication(
CONFIG['client_id'],
{"private_key": pem, "thumbprint": CONFIG["thumbprint"], "public_certificate": public_certificate})
x5c = app._extract_x5c_value()
self.assertIsInstance(x5c, list)
self.assertGreater(len(x5c), 1)

@unittest.skipUnless("public_certificate" in CONFIG, "Missing Public cert")
def test_subject_name_issuer_authentication_input_without_begin_certificate_and_end_certificate(self):
assert ("private_key_file" in CONFIG
and "thumbprint" in CONFIG and "public_certificate" in CONFIG)
key_path = os.path.join(THIS_FOLDER, CONFIG['private_key_file'])
public_certificate = os.path.join(THIS_FOLDER, CONFIG['public_certificate'])
with open(key_path) as f:
pem = f.read()
with open(public_certificate) as f:
public_certificate = f.read()
app = ConfidentialClientApplication(
CONFIG['client_id'],
{"private_key": pem, "thumbprint": CONFIG["thumbprint"], "public_certificate": public_certificate})
x5c = app._extract_x5c_value()
self.assertIsInstance(x5c, list)
self.assertEquals(len(x5c), 1)

@unittest.skipUnless("public_certificate" in CONFIG, "Missing Public cert")
def test_subject_name_issuer_authentication(self):
assert ("private_key_file" in CONFIG
and "thumbprint" in CONFIG and "public_certificate" in CONFIG)
key_path = os.path.join(THIS_FOLDER, CONFIG['private_key_file'])
public_certificate = os.path.join(THIS_FOLDER, CONFIG['public_certificate'])
with open(key_path) as f:
pem = f.read()
with open(public_certificate) as f:
Comment thread
rayluo marked this conversation as resolved.
Outdated
public_certificate = f.read()
app = ConfidentialClientApplication(
CONFIG['client_id'], authority=CONFIG["authority"],
client_credential={"private_key": pem, "thumbprint": CONFIG["thumbprint"],
"public_certificate": public_certificate})
scope = CONFIG.get("scope", [])
result = app.acquire_token_for_client(scope)
self.assertIn('access_token', result)
self.assertCacheWorks(result, app.acquire_token_silent(scope, account=None))

@unittest.skipUnless("client_id" in CONFIG, "client_id missing")
class TestPublicClientApplication(Oauth2TestCase):
Expand Down