-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e886157
commit 0ee1973
Showing
6 changed files
with
255 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# !!! | ||
# WARNING: This file is autogenerated | ||
# Only modify code within MANUAL() sections | ||
# or your changes may be overwritten later! | ||
# !!! | ||
|
||
from __future__ import annotations | ||
|
||
from typing import Any, Dict | ||
|
||
from stytch.b2b.models.impersonation import AuthenticateResponse | ||
from stytch.core.api_base import ApiBase | ||
from stytch.core.http.client import AsyncClient, SyncClient | ||
|
||
|
||
class Impersonation: | ||
def __init__( | ||
self, api_base: ApiBase, sync_client: SyncClient, async_client: AsyncClient | ||
) -> None: | ||
self.api_base = api_base | ||
self.sync_client = sync_client | ||
self.async_client = async_client | ||
|
||
def authenticate( | ||
self, | ||
token: str, | ||
) -> AuthenticateResponse: | ||
headers: Dict[str, str] = {} | ||
data: Dict[str, Any] = { | ||
"token": token, | ||
} | ||
|
||
url = self.api_base.url_for("/v1/b2b/impersonation/authenticate", data) | ||
res = self.sync_client.post(url, data, headers) | ||
return AuthenticateResponse.from_json(res.response.status_code, res.json) | ||
|
||
async def authenticate_async( | ||
self, | ||
token: str, | ||
) -> AuthenticateResponse: | ||
headers: Dict[str, str] = {} | ||
data: Dict[str, Any] = { | ||
"token": token, | ||
} | ||
|
||
url = self.api_base.url_for("/v1/b2b/impersonation/authenticate", data) | ||
res = await self.async_client.post(url, data, headers) | ||
return AuthenticateResponse.from_json(res.response.status, res.json) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# !!! | ||
# WARNING: This file is autogenerated | ||
# Only modify code within MANUAL() sections | ||
# or your changes may be overwritten later! | ||
# !!! | ||
|
||
from __future__ import annotations | ||
|
||
from typing import Optional | ||
|
||
from stytch.b2b.models.mfa import MfaRequired | ||
from stytch.b2b.models.organizations import Member, Organization | ||
from stytch.b2b.models.sessions import MemberSession | ||
from stytch.core.response_base import ResponseBase | ||
|
||
|
||
class AuthenticateResponse(ResponseBase): | ||
member_id: str | ||
organization_id: str | ||
member: Member | ||
session_token: str | ||
session_jwt: str | ||
organization: Organization | ||
intermediate_session_token: str | ||
member_authenticated: bool | ||
member_session: Optional[MemberSession] = None | ||
mfa_required: Optional[MfaRequired] = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any, Dict, Optional | ||
|
||
import jwt | ||
|
||
from stytch.consumer.models.idp import AccessTokenJWTClaims, AccessTokenJWTResponse | ||
from stytch.core.api_base import ApiBase | ||
from stytch.core.http.client import AsyncClient, SyncClient | ||
from stytch.shared import jwt_helpers | ||
|
||
|
||
class IDP: | ||
def __init__( | ||
self, | ||
api_base: ApiBase, | ||
sync_client: SyncClient, | ||
async_client: AsyncClient, | ||
jwks_client: jwt.PyJWKClient, | ||
project_id: str, | ||
) -> None: | ||
self.api_base = api_base | ||
self.sync_client = sync_client | ||
self.async_client = async_client | ||
self.jwks_client = jwks_client | ||
self.project_id = project_id | ||
|
||
# MANUAL(introspect_idp_access_token)(SERVICE_METHOD) | ||
# ADDIMPORT: from typing import Optional | ||
# ADDIMPORT: from stytch.b2b.models.idp import AccessTokenJWTResponse | ||
def introspect_idp_access_token( | ||
self, | ||
access_token: str, | ||
client_id: str, | ||
client_secret: Optional[str] = None, | ||
grant_type: str = "authorization_code", | ||
token_type_hint: str = "access_token", | ||
) -> Optional[AccessTokenJWTClaims]: | ||
return self.introspect_idp_access_token_local( | ||
access_token, client_id | ||
) or self.introspect_idp_access_token_network( | ||
access_token, client_id, client_secret, grant_type, token_type_hint | ||
) | ||
|
||
# ENDMANUAL(introspect_idp_access_token) | ||
|
||
# MANUAL(introspect_idp_access_token_network)(SERVICE_METHOD) | ||
# ADDIMPORT: from typing import Optional | ||
# ADDIMPORT: from stytch.b2b.models.idp import AccessTokenJWTClaims, AccessTokenJWTResponse | ||
# ADDIMPORT: from stytch.shared import jwt_helpers | ||
# ADDIMPORT: from stytch.shared import rbac_local | ||
def introspect_idp_access_token_network( | ||
self, | ||
access_token: str, | ||
client_id: str, | ||
client_secret: Optional[str] = None, | ||
grant_type: str = "authorization_code", | ||
token_type_hint: str = "access_token", | ||
) -> Optional[AccessTokenJWTClaims]: | ||
headers: Dict[str, str] = {"Content-Type": "application/x-www-form-urlencoded"} | ||
data: Dict[str, Any] = { | ||
"token": access_token, | ||
"client_id": client_id, | ||
"grant_type": grant_type, | ||
"token_type_hint": token_type_hint, | ||
} | ||
if client_secret is not None: | ||
data["client_secret"] = client_secret | ||
|
||
url = self.api_base.url_for( | ||
f"/v1/public/{self.project_id}/oauth2/introspect", data | ||
) | ||
res = self.sync_client.postForm(url, data, headers) | ||
jwtResponse = AccessTokenJWTResponse.from_json( | ||
res.response.status_code, res.json | ||
) | ||
if not jwtResponse.active: | ||
return None | ||
return AccessTokenJWTClaims( | ||
subject=jwtResponse.sub, scopes=jwtResponse.scope, custom_claims={} | ||
) | ||
|
||
# ENDMANUAL(introspect_idp_access_token_network) | ||
|
||
# MANUAL(introspect_idp_access_token_local)(SERVICE_METHOD) | ||
# ADDIMPORT: from typing import Optional | ||
# ADDIMPORT: from stytch.b2b.models.sessions import AccessTokenJWTClaims | ||
# ADDIMPORT: from stytch.shared import jwt_helpers | ||
def introspect_idp_access_token_local( | ||
self, | ||
access_token: str, | ||
client_id: str, | ||
) -> Optional[AccessTokenJWTClaims]: | ||
_scope_claim = "scope" | ||
generic_claims = jwt_helpers.authenticate_jwt_local( | ||
project_id=self.project_id, | ||
jwks_client=self.jwks_client, | ||
jwt=access_token, | ||
custom_audience=client_id, | ||
custom_issuer=f"https://stytch.com/{self.project_id}", | ||
) | ||
if generic_claims is None: | ||
return None | ||
|
||
custom_claims = { | ||
k: v for k, v in generic_claims.untyped_claims.items() if k != _scope_claim | ||
} | ||
|
||
return AccessTokenJWTClaims( | ||
subject=generic_claims.reserved_claims["sub"], | ||
scopes=generic_claims.untyped_claims[_scope_claim], | ||
custom_claims=custom_claims, | ||
) | ||
|
||
# ENDMANUAL(introspect_idp_access_token_local) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from typing import Any, Dict, Optional | ||
|
||
import pydantic | ||
|
||
from stytch.core.response_base import ResponseBase | ||
|
||
|
||
# MANUAL(AccessTokenJWTResponse)(TYPES) | ||
# ADDIMPORT: from typing import Any, Dict, List, Optional | ||
# ADDIMPORT: import pydantic | ||
class AccessTokenJWTResponse(ResponseBase): | ||
"""Response type for `Sessions.introspect_idp_access_token`. | ||
Fields: | ||
- active: Whether or not this token is active. | ||
- sub: Subject of this JWT. | ||
- scope: Scopes that this JWT is granted. | ||
""" # noqa | ||
|
||
active: bool | ||
sub: Optional[str] = None | ||
scope: Optional[str] = None | ||
|
||
|
||
# ENDMANUAL(AccessTokenJWTResponse) | ||
|
||
|
||
# MANUAL(AccessTokenJWTClaims)(TYPES) | ||
# ADDIMPORT: from typing import Any, Dict, List, Optional | ||
# ADDIMPORT: import pydantic | ||
class AccessTokenJWTClaims(pydantic.BaseModel): | ||
"""Response type for `Sessions.introspect_idp_access_token`. | ||
Fields: | ||
- subject: The subject (either user_id or member_id) that the JWT is intended for. | ||
- scopes: A list of scopes granted, separated by spaces. | ||
- custom_claims: A dict of custom claims of the JWT. | ||
""" # noqa | ||
|
||
subject: str | ||
scopes: Optional[str] | ||
custom_claims: Optional[Dict[str, Any]] = None | ||
|
||
|
||
# ENDMANUAL(AccessTokenJWTClaims) |