-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IDP token introspection #229
base: main
Are you sure you want to change the base?
Changes from 19 commits
5dc3979
79303c2
c8e680f
4f849c3
12467a8
c92ebaa
d4d4723
e886157
0ee1973
99997bd
973985d
6f7fd4a
bb0dc20
c9362d9
5034867
b30e297
23832a4
42f32e3
cb16477
a7d824e
bb1049c
8069a7c
1876c32
8074432
ddd3e82
df4ca4b
9385c16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any, Dict, Optional | ||
|
||
import jwt | ||
|
||
from stytch.consumer.models.idp import AccessTokenJWTClaims, AccessTokenJWTResponse | ||
from stytch.core.api_base import ApiBase | ||
from stytch.core.http.client import AsyncClient, SyncClient | ||
from stytch.shared import jwt_helpers | ||
|
||
|
||
class IDP: | ||
def __init__( | ||
self, | ||
api_base: ApiBase, | ||
sync_client: SyncClient, | ||
async_client: AsyncClient, | ||
jwks_client: jwt.PyJWKClient, | ||
project_id: str, | ||
) -> None: | ||
self.api_base = api_base | ||
self.sync_client = sync_client | ||
self.async_client = async_client | ||
self.jwks_client = jwks_client | ||
self.project_id = project_id | ||
|
||
def introspect_idp_access_token( | ||
self, | ||
access_token: str, | ||
client_id: str, | ||
client_secret: Optional[str] = None, | ||
token_type_hint: str = "access_token", | ||
) -> Optional[AccessTokenJWTClaims]: | ||
return self.introspect_idp_access_token_local( | ||
access_token, client_id | ||
) or self.introspect_idp_access_token_network( | ||
access_token, client_id, client_secret, token_type_hint | ||
) | ||
|
||
async def introspect_idp_access_token_async( | ||
self, | ||
access_token: str, | ||
client_id: str, | ||
client_secret: Optional[str] = None, | ||
token_type_hint: str = "access_token", | ||
) -> Optional[AccessTokenJWTClaims]: | ||
local_introspection_response = self.introspect_idp_access_token_local(access_token, client_id) | ||
if local_introspection_response is not None: | ||
return local_introspection_response | ||
return await self.introspect_idp_access_token_network_async( | ||
access_token, client_id, client_secret, token_type_hint | ||
) | ||
|
||
def introspect_idp_access_token_network( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming nits:
I think |
||
self, | ||
access_token: str, | ||
client_id: str, | ||
client_secret: Optional[str] = None, | ||
token_type_hint: str = "access_token", | ||
) -> Optional[AccessTokenJWTClaims]: | ||
headers: Dict[str, str] = {"Content-Type": "application/x-www-form-urlencoded"} | ||
data: Dict[str, Any] = { | ||
"token": access_token, | ||
"client_id": client_id, | ||
"token_type_hint": token_type_hint, | ||
} | ||
if client_secret is not None: | ||
data["client_secret"] = client_secret | ||
|
||
url = self.api_base.url_for( | ||
f"/v1/public/{self.project_id}/oauth2/introspect", data | ||
) | ||
res = self.sync_client.postForm(url, data, headers) | ||
jwtResponse = AccessTokenJWTResponse.from_json( | ||
res.response.status_code, res.json | ||
) | ||
if not jwtResponse.active: | ||
return None | ||
return AccessTokenJWTClaims( | ||
subject=jwtResponse.sub, | ||
scope=jwtResponse.scope, | ||
custom_claims={}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should return custom claims that we retrieve from the introspection response here |
||
audience=jwtResponse.aud, | ||
expires_at=jwtResponse.exp, | ||
issued_at=jwtResponse.iat, | ||
issuer=jwtResponse.iss, | ||
not_before=jwtResponse.nbf, | ||
) | ||
|
||
async def introspect_idp_access_token_network_async( | ||
self, | ||
access_token: str, | ||
client_id: str, | ||
client_secret: Optional[str] = None, | ||
token_type_hint: str = "access_token", | ||
) -> Optional[AccessTokenJWTClaims]: | ||
headers: Dict[str, str] = {"Content-Type": "application/x-www-form-urlencoded"} | ||
data: Dict[str, Any] = { | ||
"token": access_token, | ||
"client_id": client_id, | ||
"token_type_hint": token_type_hint, | ||
} | ||
if client_secret is not None: | ||
data["client_secret"] = client_secret | ||
|
||
url = self.api_base.url_for( | ||
f"/v1/public/{self.project_id}/oauth2/introspect", data | ||
) | ||
res = await self.async_client.postForm(url, data, headers) | ||
jwtResponse = AccessTokenJWTResponse.from_json( | ||
res.response.status, res.json | ||
) | ||
if not jwtResponse.active: | ||
return None | ||
return AccessTokenJWTClaims( | ||
subject=jwtResponse.sub, | ||
scope=jwtResponse.scope, | ||
custom_claims={}, | ||
audience=jwtResponse.aud, | ||
expires_at=jwtResponse.exp, | ||
issued_at=jwtResponse.iat, | ||
issuer=jwtResponse.iss, | ||
not_before=jwtResponse.nbf, | ||
) | ||
|
||
def introspect_idp_access_token_local( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in contrast this method is only valid for access tokens |
||
self, | ||
access_token: str, | ||
client_id: str, | ||
) -> Optional[AccessTokenJWTClaims]: | ||
_scope_claim = "scope" | ||
generic_claims = jwt_helpers.authenticate_jwt_local( | ||
project_id=self.project_id, | ||
jwks_client=self.jwks_client, | ||
jwt=access_token, | ||
custom_audience=client_id, | ||
custom_issuer=f"https://stytch.com/{self.project_id}", | ||
) | ||
if generic_claims is None: | ||
return None | ||
|
||
custom_claims = { | ||
k: v for k, v in generic_claims.untyped_claims.items() if k != _scope_claim | ||
} | ||
|
||
return AccessTokenJWTClaims( | ||
subject=generic_claims.reserved_claims["sub"], | ||
scope=generic_claims.untyped_claims[_scope_claim], | ||
custom_claims=custom_claims, | ||
audience=generic_claims.reserved_claims["aud"], | ||
expires_at=generic_claims.reserved_claims["exp"], | ||
issued_at=generic_claims.reserved_claims["iat"], | ||
issuer=generic_claims.reserved_claims["iss"], | ||
not_before=generic_claims.reserved_claims["nbf"], | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from typing import Any, Dict, List, Optional | ||
|
||
import pydantic | ||
|
||
from stytch.core.response_base import ResponseBase | ||
|
||
|
||
class AccessTokenJWTResponse(ResponseBase): | ||
"""Response type for `Sessions.introspect_idp_access_token`. | ||
Fields: | ||
- active: Whether or not this token is active. | ||
- sub: Subject of this JWT. | ||
- scope: A space-delimited string of scopes this JWT is granted. | ||
""" # noqa | ||
|
||
active: bool | ||
sub: Optional[str] = None | ||
scope: Optional[str] = None | ||
aud: Optional[List[str]] = [] | ||
exp: Optional[int] = None | ||
iat: Optional[int] = None | ||
iss: Optional[str] = None | ||
nbf: Optional[int] = None | ||
|
||
|
||
class AccessTokenJWTClaims(pydantic.BaseModel): | ||
"""Response type for `Sessions.introspect_idp_access_token`. | ||
Fields: | ||
- subject: The subject (either user_id or member_id) that the JWT is intended for. | ||
- scope: A space-delimited string of scopes this JWT is granted. | ||
- custom_claims: A dict of custom claims of the JWT. | ||
""" # noqa | ||
|
||
subject: str | ||
scope: Optional[str] | ||
custom_claims: Optional[Dict[str, Any]] = None | ||
audience: Optional[List[str]] | ||
expires_at: Optional[int] | ||
issued_at: Optional[int] | ||
issuer: Optional[str] | ||
not_before: Optional[int] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ | |
import requests | ||
import requests.auth | ||
|
||
import json | ||
|
||
from stytch.version import __version__ | ||
|
||
HEADERS = { | ||
|
@@ -66,6 +68,17 @@ def post( | |
resp = requests.post(url, json=json, headers=final_headers, auth=self.auth) | ||
return self._response_from_request(resp) | ||
|
||
def postForm( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we name this |
||
self, | ||
url: str, | ||
form: Optional[Dict[str, Any]], | ||
headers: Optional[Dict[str, str]] = None, | ||
) -> ResponseWithJson: | ||
final_headers = self.headers.copy() | ||
final_headers.update(headers or {}) | ||
resp = requests.post(url, data=form, headers=final_headers, auth=self.auth) | ||
return self._response_from_request(resp) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does one flavor of post_form use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the sync flavor uses the |
||
|
||
def put( | ||
self, | ||
url: str, | ||
|
@@ -127,6 +140,16 @@ async def _response_from_request( | |
except Exception: | ||
resp_json = {} | ||
return ResponseWithJson(response=r, json=resp_json) | ||
|
||
async def _response_from_post_form_request( | ||
cls, r: aiohttp.ClientResponse | ||
) -> ResponseWithJson: | ||
try: | ||
content = await r.content.read() | ||
resp_json = json.loads(content.decode()) | ||
except Exception as e: | ||
resp_json = {} | ||
return ResponseWithJson(response=r, json=resp_json) | ||
|
||
async def get( | ||
self, | ||
|
@@ -153,6 +176,19 @@ async def post( | |
url, json=json, headers=final_headers, auth=self.auth | ||
) | ||
return await self._response_from_request(resp) | ||
|
||
async def postForm( | ||
self, | ||
url: str, | ||
form: Optional[Dict[str, Any]], | ||
headers: Optional[Dict[str, str]] = None, | ||
) -> ResponseWithJson: | ||
final_headers = self.headers.copy() | ||
final_headers.update(headers or {}) | ||
resp = await self._session.post( | ||
url, data=form, headers=final_headers, auth=self.auth | ||
) | ||
return await self._response_from_post_form_request(resp) | ||
|
||
async def put( | ||
self, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need
introspect_idp_access_token
since unlike Session JWTs there is no case where an access token can fail local validation and pass remote validation. If the access token is expired locally, it is also guaranteed to be expired serverside.