Skip to content

Commit

Permalink
Merge pull request #821 from WPI-LNL/master
Browse files Browse the repository at this point in the history
master to dev
  • Loading branch information
alextannenbaum authored Oct 23, 2023
2 parents 9c53393 + bf39c5b commit 995425f
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 46 deletions.
76 changes: 76 additions & 0 deletions accounts/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
import requests
import msal

from django.contrib.auth import get_user_model
from django.conf import settings


app = msal.ConfidentialClientApplication(
settings.GRAPH_API_CLIENT_ID, authority=settings.GRAPH_API_AUTHORITY,
client_credential=settings.GRAPH_API_SECRET,
)

def acquire_graph_access_token():
result = None

# Firstly, looks up a token from cache
# Since we are looking for token for the current app, NOT for an end user,
# notice we give account parameter as None.
result = app.acquire_token_silent([settings.GRAPH_API_SCOPE], account=None)

if not result:
print("No suitable token exists in cache. Let's get a new one from AAD.")
result = app.acquire_token_for_client(scopes=[settings.GRAPH_API_SCOPE])

if "access_token" in result:
return result['access_token']
else:
print(result.get("error"))
print(result.get("error_description"))
print(result.get("correlation_id")) # You may need this when reporting a bug
return None

def search_users(q):
token = acquire_graph_access_token()
results = requests.get(
settings.GRAPH_API_ENDPOINT,
headers={
'Authorization': f'Bearer {token}',
'ConsistencyLevel': 'eventual'
},
params={
'$select': 'givenName,surname,mail,mailNickname,employeeId,OnPremisesExtensionAttributes',
#'$search': f'displayName:{q}'
'$search': "\"displayName:" + q + "\""
}
)
return results

def search_or_create_users(q):
objs = []
graph_resp = search_users(q)
graph_resp = json.loads(graph_resp.text)
for graph_u in graph_resp["value"]:
if graph_u.get('givenName', '') is not None and graph_u.get('surname', '') is not None:
try:
class_year = graph_u.get('onPremisesExtensionAttributes').get('extensionAttribute2', [None])
except IndexError:
class_year = None
try:
class_year = int('20' + class_year) #Graph only provides final two digits of class year
except (ValueError, TypeError):
class_year = None
given_name = graph_u.get('givenName', '')
last_name = graph_u.get('surname', '')
u, created = get_user_model().objects.get_or_create(
username=graph_u['mailNickname'],
defaults={
'email': graph_u.get('mail', [False]) or graph_u['mailNickname'][0] + "@wpi.edu",
'first_name': given_name,
'last_name': last_name,
'class_year': class_year,
}
)
objs.append(u)
return objs
3 changes: 2 additions & 1 deletion accounts/lookups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.db.models import Q

from . import ldap
from . import graph


class UserLookup(LookupChannel):
Expand All @@ -22,7 +23,7 @@ def get_query(self, q, request, search_ldap=True):
if (results or search_ldap is False):
return results

results = ldap.search_or_create_users(q)
results = graph.search_or_create_users(q)
if results: # call the thing again to ensure any prefetches
return self.get_query(q, request, search_ldap=False)
return []
Expand Down
88 changes: 44 additions & 44 deletions accounts/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,50 +481,50 @@ def test_mdc_name(self):
self.assertEqual(self.user.mdc_name, "PERSONS,TEST")


class LdapTestCase(test.TestCase):

def setUp(self):
# some user who is guaranteed to be on the ldap server. Like our club account.
self.some_user = models.User.objects.create(username="lnl")
Group.objects.create(name='Active').user_set.add(self.some_user)

def test_search(self):
# test that we get *some* results
resp = ldap.search_users("l")
self.assertGreater(len(resp), 0)

def test_search_or_create(self):
# test that we get *some* results, and that they are saved as users
test_query = "a"
resp = ldap.search_or_create_users(test_query)
self.assertGreater(len(resp), 0)
self.assertIsInstance(resp[0], models.User)

def test_fill(self):
# if a user is on the ldap server and lacking a name, it will retrieve it
u = ldap.fill_in_user(self.some_user)
self.assertNotEquals(u.first_name, "")

def test_fill_no_overwrite(self):
# if the user has a first and/or last name, fetching it won't overwrite it.
self.some_user.first_name = "foo" # does not match ldap name
self.some_user.save()

u = ldap.fill_in_user(self.some_user)
self.assertEquals(u.first_name, "foo")

def test_scrape_cmd(self):
output = StringIO()
management.call_command("scrape_ldap", stdout=output)
output = output.getvalue()

self.assertTrue(output.lower().startswith("1 user"),
msg="Ldap scraper returns '%s', expecting 1 user" % output)
# assert it detects our sole user needing info

self.some_user.refresh_from_db()
self.assertNotEquals(self.some_user.first_name, "")
# assert that it fill in some info and saves it.
# class LdapTestCase(test.TestCase):

# def setUp(self):
# # some user who is guaranteed to be on the ldap server. Like our club account.
# self.some_user = models.User.objects.create(username="lnl")
# Group.objects.create(name='Active').user_set.add(self.some_user)

# def test_search(self):
# # test that we get *some* results
# resp = ldap.search_users("l")
# self.assertGreater(len(resp), 0)

# def test_search_or_create(self):
# # test that we get *some* results, and that they are saved as users
# test_query = "a"
# resp = ldap.search_or_create_users(test_query)
# self.assertGreater(len(resp), 0)
# self.assertIsInstance(resp[0], models.User)

# def test_fill(self):
# # if a user is on the ldap server and lacking a name, it will retrieve it
# u = ldap.fill_in_user(self.some_user)
# self.assertNotEquals(u.first_name, "")

# def test_fill_no_overwrite(self):
# # if the user has a first and/or last name, fetching it won't overwrite it.
# self.some_user.first_name = "foo" # does not match ldap name
# self.some_user.save()

# u = ldap.fill_in_user(self.some_user)
# self.assertEquals(u.first_name, "foo")

# def test_scrape_cmd(self):
# output = StringIO()
# management.call_command("scrape_ldap", stdout=output)
# output = output.getvalue()

# self.assertTrue(output.lower().startswith("1 user"),
# msg="Ldap scraper returns '%s', expecting 1 user" % output)
# # assert it detects our sole user needing info

# self.some_user.refresh_from_db()
# self.assertNotEquals(self.some_user.first_name, "")
# # assert that it fill in some info and saves it.


class OfficerImgTestCase(test.TestCase):
Expand Down
6 changes: 6 additions & 0 deletions lnldb/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ def from_runtime(*x):
CCC_PASS = env.str("CCC_PASS", "")
SECRET_KEY = env.str("SECRET_KEY", "I am insecure.")

GRAPH_API_AUTHORITY = env.str('GRAPH_API_AUTHORITY', '')
GRAPH_API_CLIENT_ID = env.str('GRAPH_API_CLIENT_ID', '')
GRAPH_API_SCOPE = env.str('GRAPH_API_SCOPE', '')
GRAPH_API_SECRET = env.str('GRAPH_API_SECRET', '')
GRAPH_API_ENDPOINT = env.str('GRAPH_API_ENDPOINT', '')

SLACK_TOKEN = env.str('SLACK_BOT_TOKEN', None)

# If True, the bot will automatically attempt to join new channels when they are created in Slack
Expand Down
3 changes: 2 additions & 1 deletion requirements_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ django-multiselectfield==0.1.12
drf-spectacular==0.21.0
spotipy==2.19.0
qrcode==7.3.1
levenshtein==0.21.1
levenshtein==0.21.1
msal==1.24.1

0 comments on commit 995425f

Please sign in to comment.