Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IDP token introspection #229

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions stytch/b2b/api/idp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import annotations

from typing import Any, Dict, Optional

import jwt

from stytch.b2b.models.idp import AccessTokenJWTClaims, AccessTokenJWTResponse
from stytch.core.api_base import ApiBase
from stytch.core.http.client import AsyncClient, SyncClient
from stytch.shared import jwt_helpers
from stytch.shared.policy_cache import PolicyCache


class IDP:
def __init__(
self,
api_base: ApiBase,
sync_client: SyncClient,
async_client: AsyncClient,
jwks_client: jwt.PyJWKClient,
project_id: str,
policy_cache: PolicyCache,
) -> None:
self.api_base = api_base
self.sync_client = sync_client
self.async_client = async_client
self.policy_cache = policy_cache
self.jwks_client = jwks_client
self.project_id = project_id

# MANUAL(introspect_idp_access_token)(SERVICE_METHOD)
max-stytch marked this conversation as resolved.
Show resolved Hide resolved
# ADDIMPORT: from typing import Optional
# ADDIMPORT: from stytch.b2b.models.idp import AccessTokenJWTResponse
def introspect_idp_access_token(
self,
access_token: str,
client_id: str,
client_secret: Optional[str] = None,
grant_type: str = "authorization_code",
token_type_hint: str = "access_token",
) -> Optional[AccessTokenJWTClaims]:
return self.introspect_idp_access_token_local(
access_token, client_id
) or self.introspect_idp_access_token_network(
access_token, client_id, client_secret, grant_type, token_type_hint
)

# ENDMANUAL(introspect_idp_access_token)

# MANUAL(introspect_idp_access_token_network)(SERVICE_METHOD)
# ADDIMPORT: from typing import Optional
# ADDIMPORT: from stytch.b2b.models.idp import AccessTokenJWTClaims, AccessTokenJWTResponse
# ADDIMPORT: from stytch.shared import jwt_helpers
# ADDIMPORT: from stytch.shared import rbac_local
def introspect_idp_access_token_network(
self,
access_token: str,
client_id: str,
client_secret: Optional[str] = None,
grant_type: str = "authorization_code",
token_type_hint: str = "access_token",
) -> Optional[AccessTokenJWTClaims]:
headers: Dict[str, str] = {"Content-Type": "application/x-www-form-urlencoded"}
data: Dict[str, Any] = {
"token": access_token,
"client_id": client_id,
"grant_type": grant_type,
"token_type_hint": token_type_hint,
}
if client_secret is not None:
data["client_secret"] = client_secret

url = self.api_base.url_for(
f"/v1/public/{self.project_id}/oauth2/introspect", data
)
res = self.sync_client.postForm(url, data, headers)
jwtResponse = AccessTokenJWTResponse.from_json(
res.response.status_code, res.json
)
if not jwtResponse.active:
return None
return AccessTokenJWTClaims(
subject=jwtResponse.sub, scopes=jwtResponse.scope, custom_claims=None
max-stytch marked this conversation as resolved.
Show resolved Hide resolved
)

# ENDMANUAL(introspect_idp_access_token_network)

# MANUAL(introspect_idp_access_token_local)(SERVICE_METHOD)
# ADDIMPORT: from typing import Optional
# ADDIMPORT: from stytch.b2b.models.sessions import AccessTokenJWTClaims
# ADDIMPORT: from stytch.shared import jwt_helpers
def introspect_idp_access_token_local(
self,
access_token: str,
client_id: str,
) -> Optional[AccessTokenJWTClaims]:
_scope_claim = "scope"
generic_claims = jwt_helpers.authenticate_jwt_local(
project_id=self.project_id,
jwks_client=self.jwks_client,
jwt=access_token,
custom_audience=client_id,
custom_issuer=f"https://stytch.com/{self.project_id}",
)
if generic_claims is None:
return None

custom_claims = {
k: v for k, v in generic_claims.untyped_claims.items() if k != _scope_claim
}

return AccessTokenJWTClaims(
subject=generic_claims.reserved_claims["sub"],
scopes=generic_claims.untyped_claims[_scope_claim],
custom_claims=custom_claims,
)

# ENDMANUAL(introspect_idp_access_token_local)
48 changes: 48 additions & 0 deletions stytch/b2b/api/impersonation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# !!!
# WARNING: This file is autogenerated
# Only modify code within MANUAL() sections
# or your changes may be overwritten later!
# !!!

from __future__ import annotations

from typing import Any, Dict

from stytch.b2b.models.impersonation import AuthenticateResponse
from stytch.core.api_base import ApiBase
from stytch.core.http.client import AsyncClient, SyncClient


class Impersonation:
def __init__(
self, api_base: ApiBase, sync_client: SyncClient, async_client: AsyncClient
) -> None:
self.api_base = api_base
self.sync_client = sync_client
self.async_client = async_client

def authenticate(
self,
token: str,
) -> AuthenticateResponse:
headers: Dict[str, str] = {}
data: Dict[str, Any] = {
"token": token,
}

url = self.api_base.url_for("/v1/b2b/impersonation/authenticate", data)
res = self.sync_client.post(url, data, headers)
return AuthenticateResponse.from_json(res.response.status_code, res.json)

async def authenticate_async(
self,
token: str,
) -> AuthenticateResponse:
headers: Dict[str, str] = {}
data: Dict[str, Any] = {
"token": token,
}

url = self.api_base.url_for("/v1/b2b/impersonation/authenticate", data)
res = await self.async_client.post(url, data, headers)
return AuthenticateResponse.from_json(res.response.status, res.json)
14 changes: 14 additions & 0 deletions stytch/b2b/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import jwt

from stytch.b2b.api.discovery import Discovery
from stytch.b2b.api.impersonation import Impersonation
from stytch.b2b.api.magic_links import MagicLinks
from stytch.b2b.api.oauth import OAuth
from stytch.b2b.api.organizations import Organizations
Expand All @@ -23,6 +24,7 @@
from stytch.b2b.api.sso import SSO
from stytch.b2b.api.totps import TOTPs
from stytch.consumer.api.fraud import Fraud
from stytch.consumer.api.idp import IDP
from stytch.consumer.api.m2m import M2M
from stytch.consumer.api.project import Project
from stytch.core.client_base import ClientBase
Expand Down Expand Up @@ -72,6 +74,11 @@ def __init__(
sync_client=self.sync_client,
async_client=self.async_client,
)
self.impersonation = Impersonation(
api_base=self.api_base,
sync_client=self.sync_client,
async_client=self.async_client,
)
self.m2m = M2M(
api_base=self.api_base,
sync_client=self.sync_client,
Expand Down Expand Up @@ -142,6 +149,13 @@ def __init__(
sync_client=self.sync_client,
async_client=self.async_client,
)
self.idp = IDP(
api_base=self.api_base,
sync_client=self.sync_client,
async_client=self.async_client,
jwks_client=self.jwks_client,
project_id=project_id,
)

def get_jwks_client(self, project_id: str) -> jwt.PyJWKClient:
data = {"project_id": project_id}
Expand Down
43 changes: 43 additions & 0 deletions stytch/b2b/models/idp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any, Dict, Optional

import pydantic

from stytch.core.response_base import ResponseBase


# MANUAL(AccessTokenJWTResponse)(TYPES)
# ADDIMPORT: from typing import Any, Dict, List, Optional
# ADDIMPORT: import pydantic
class AccessTokenJWTResponse(ResponseBase):
"""Response type for `Sessions.introspect_idp_access_token`.
Fields:
- active: Whether or not this token is active.
- sub: Subject of this JWT.
- scope: Scopes that this JWT is granted.
""" # noqa

active: bool
sub: Optional[str] = None
scope: Optional[str] = None


# ENDMANUAL(AccessTokenJWTResponse)


# MANUAL(AccessTokenJWTClaims)(TYPES)
# ADDIMPORT: from typing import Any, Dict, List, Optional
# ADDIMPORT: import pydantic
class AccessTokenJWTClaims(pydantic.BaseModel):
"""Response type for `Sessions.introspect_idp_access_token`.
Fields:
- subject: The subject (either user_id or member_id) that the JWT is intended for.
- scopes: A list of scopes granted, separated by spaces.
- custom_claims: A dict of custom claims of the JWT.
""" # noqa

subject: str
scopes: Optional[str]
max-stytch marked this conversation as resolved.
Show resolved Hide resolved
custom_claims: Optional[Dict[str, Any]] = None
max-stytch marked this conversation as resolved.
Show resolved Hide resolved


# ENDMANUAL(AccessTokenJWTClaims)
27 changes: 27 additions & 0 deletions stytch/b2b/models/impersonation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# !!!
# WARNING: This file is autogenerated
# Only modify code within MANUAL() sections
# or your changes may be overwritten later!
# !!!

from __future__ import annotations

from typing import Optional

from stytch.b2b.models.mfa import MfaRequired
from stytch.b2b.models.organizations import Member, Organization
from stytch.b2b.models.sessions import MemberSession
from stytch.core.response_base import ResponseBase


class AuthenticateResponse(ResponseBase):
member_id: str
organization_id: str
member: Member
session_token: str
session_jwt: str
organization: Organization
intermediate_session_token: str
member_authenticated: bool
member_session: Optional[MemberSession] = None
mfa_required: Optional[MfaRequired] = None
111 changes: 111 additions & 0 deletions stytch/consumer/api/idp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

from typing import Any, Dict, Optional

import jwt

from stytch.consumer.models.idp import AccessTokenJWTClaims, AccessTokenJWTResponse
from stytch.core.api_base import ApiBase
from stytch.core.http.client import AsyncClient, SyncClient
from stytch.shared import jwt_helpers


class IDP:
def __init__(
self,
api_base: ApiBase,
sync_client: SyncClient,
async_client: AsyncClient,
jwks_client: jwt.PyJWKClient,
project_id: str,
) -> None:
self.api_base = api_base
self.sync_client = sync_client
self.async_client = async_client
self.jwks_client = jwks_client
self.project_id = project_id

def introspect_idp_access_token(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need introspect_idp_access_token since unlike Session JWTs there is no case where an access token can fail local validation and pass remote validation. If the access token is expired locally, it is also guaranteed to be expired serverside.

self,
access_token: str,
client_id: str,
client_secret: Optional[str] = None,
grant_type: str = "authorization_code",
token_type_hint: str = "access_token",
) -> Optional[AccessTokenJWTClaims]:
return self.introspect_idp_access_token_local(
access_token, client_id
) or self.introspect_idp_access_token_network(
access_token, client_id, client_secret, grant_type, token_type_hint
)

def introspect_idp_access_token_network(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nits:

  • Since the classname is already idp - do we need the second idp in the name? idp.introspect_idp_ could be idp.introspect_
  • Since this works for both access and refresh tokens, suggest removing access_ from the name

I think introspect_token_network works better.

self,
access_token: str,
client_id: str,
client_secret: Optional[str] = None,
grant_type: str = "authorization_code",
token_type_hint: str = "access_token",
) -> Optional[AccessTokenJWTClaims]:
headers: Dict[str, str] = {"Content-Type": "application/x-www-form-urlencoded"}
data: Dict[str, Any] = {
"token": access_token,
"client_id": client_id,
"grant_type": grant_type,
max-stytch marked this conversation as resolved.
Show resolved Hide resolved
"token_type_hint": token_type_hint,
}
if client_secret is not None:
data["client_secret"] = client_secret

url = self.api_base.url_for(
f"/v1/public/{self.project_id}/oauth2/introspect", data
)
res = self.sync_client.postForm(url, data, headers)
jwtResponse = AccessTokenJWTResponse.from_json(
res.response.status_code, res.json
)
if not jwtResponse.active:
return None
return AccessTokenJWTClaims(
subject=jwtResponse.sub,
scope=jwtResponse.scope,
custom_claims={},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return custom claims that we retrieve from the introspection response here

audience=jwtResponse.aud,
expires_at=jwtResponse.exp,
issued_at=jwtResponse.iat,
issuer=jwtResponse.iss,
not_before=jwtResponse.nbf,
)

def introspect_idp_access_token_local(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in contrast this method is only valid for access tokens

self,
access_token: str,
client_id: str,
) -> Optional[AccessTokenJWTClaims]:
_scope_claim = "scope"
generic_claims = jwt_helpers.authenticate_jwt_local(
project_id=self.project_id,
jwks_client=self.jwks_client,
jwt=access_token,
custom_audience=client_id,
custom_issuer=f"https://stytch.com/{self.project_id}",
)
if generic_claims is None:
return None

custom_claims = {
k: v for k, v in generic_claims.untyped_claims.items() if k != _scope_claim
}

print(generic_claims)
max-stytch marked this conversation as resolved.
Show resolved Hide resolved

return AccessTokenJWTClaims(
subject=generic_claims.reserved_claims["sub"],
scope=generic_claims.untyped_claims[_scope_claim],
custom_claims=custom_claims,
audience=generic_claims.reserved_claims["aud"],
expires_at=generic_claims.reserved_claims["exp"],
issued_at=generic_claims.reserved_claims["iat"],
issuer=generic_claims.reserved_claims["iss"],
not_before=generic_claims.reserved_claims["nbf"],
)
Loading
Loading