@@ -319,3 +319,83 @@ def test_only_client_capabilities_no_claims_merge(self):
319
319
320
320
def test_both_claims_and_capabilities_none (self ):
321
321
self .assertEqual (_merge_claims_challenge_and_capabilities (None , None ), None )
322
+
323
+
324
+ class TestApplicationForRefreshInBehaviors (unittest .TestCase ):
325
+ """The following test cases were based on design doc here
326
+ https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=%2FRefreshAtExpirationPercentage%2Foverview.md&version=GBdev&_a=preview&anchor=scenarios
327
+ """
328
+ def setUp (self ):
329
+ self .authority_url = "https://login.microsoftonline.com/common"
330
+ self .authority = msal .authority .Authority (
331
+ self .authority_url , MinimalHttpClient ())
332
+ self .scopes = ["s1" , "s2" ]
333
+ self .uid = "my_uid"
334
+ self .utid = "my_utid"
335
+ self .account = {"home_account_id" : "{}.{}" .format (self .uid , self .utid )}
336
+ self .rt = "this is a rt"
337
+ self .cache = msal .SerializableTokenCache ()
338
+ self .client_id = "my_app"
339
+ self .app = ClientApplication (
340
+ self .client_id , authority = self .authority_url , token_cache = self .cache )
341
+
342
+ def populate_cache (self , access_token = "at" , expires_in = 86400 , refresh_in = 43200 ):
343
+ self .cache .add ({
344
+ "client_id" : self .client_id ,
345
+ "scope" : self .scopes ,
346
+ "token_endpoint" : "{}/oauth2/v2.0/token" .format (self .authority_url ),
347
+ "response" : TokenCacheTestCase .build_response (
348
+ access_token = access_token ,
349
+ expires_in = expires_in , refresh_in = refresh_in ,
350
+ uid = self .uid , utid = self .utid , refresh_token = self .rt ),
351
+ })
352
+
353
+ def test_fresh_token_should_be_returned_from_cache (self ):
354
+ # a.k.a. Return unexpired token that is not above token refresh expiration threshold
355
+ access_token = "An access token prepopulated into cache"
356
+ self .populate_cache (access_token = access_token , expires_in = 900 , refresh_in = 450 )
357
+ self .assertEqual (
358
+ access_token ,
359
+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
360
+
361
+ def test_aging_token_and_available_aad_should_return_new_token (self ):
362
+ # a.k.a. Attempt to refresh unexpired token when AAD available
363
+ self .populate_cache (access_token = "old AT" , expires_in = 3599 , refresh_in = - 1 )
364
+ new_access_token = "new AT"
365
+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
366
+ lambda * args , ** kwargs : {"access_token" : new_access_token })
367
+ self .assertEqual (
368
+ new_access_token ,
369
+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
370
+
371
+ def test_aging_token_and_unavailable_aad_should_return_old_token (self ):
372
+ # a.k.a. Attempt refresh unexpired token when AAD unavailable
373
+ old_at = "old AT"
374
+ self .populate_cache (access_token = old_at , expires_in = 3599 , refresh_in = - 1 )
375
+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
376
+ lambda * args , ** kwargs : {"error" : "sth went wrong" })
377
+ self .assertEqual (
378
+ old_at ,
379
+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
380
+
381
+ def test_expired_token_and_unavailable_aad_should_return_error (self ):
382
+ # a.k.a. Attempt refresh expired token when AAD unavailable
383
+ self .populate_cache (access_token = "expired at" , expires_in = - 1 , refresh_in = - 900 )
384
+ error = "something went wrong"
385
+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
386
+ lambda * args , ** kwargs : {"error" : error })
387
+ self .assertEqual (
388
+ error ,
389
+ self .app .acquire_token_silent_with_error ( # This variant preserves error
390
+ ['s1' ], self .account ).get ("error" ))
391
+
392
+ def test_expired_token_and_available_aad_should_return_new_token (self ):
393
+ # a.k.a. Attempt refresh expired token when AAD available
394
+ self .populate_cache (access_token = "expired at" , expires_in = - 1 , refresh_in = - 900 )
395
+ new_access_token = "new AT"
396
+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
397
+ lambda * args , ** kwargs : {"access_token" : new_access_token })
398
+ self .assertEqual (
399
+ new_access_token ,
400
+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
401
+
0 commit comments