forked from openwallet-foundation/acapy-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: move public routes to new module
Signed-off-by: Daniel Bluhm <[email protected]>
- Loading branch information
Showing
2 changed files
with
131 additions
and
122 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
"""Public routes for OID4VCI.""" | ||
|
||
import logging | ||
from typing import Optional | ||
from aries_cloudagent.core.profile import Profile | ||
import jwt as pyjwt | ||
|
||
from aiohttp import web | ||
from aiohttp_apispec import ( | ||
docs, | ||
querystring_schema, | ||
request_schema, | ||
) | ||
from aries_cloudagent.messaging.models.openapi import OpenAPISchema | ||
from aries_cloudagent.wallet.jwt import jwt_verify | ||
from marshmallow import fields | ||
from .models.cred_sup_record import OID4VCICredentialSupported | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class IssueCredentialRequestSchema(OpenAPISchema): | ||
"""Request schema for the /credential endpoint.""" | ||
|
||
format = fields.Str( | ||
required=True, | ||
metadata={"description": "The client ID for the token request.", "example": ""}, | ||
) | ||
types = fields.List( | ||
fields.Str(), | ||
metadata={"description": "List of connection records"}, | ||
) | ||
credentialsSubject = fields.Dict(metadata={"description": ""}) | ||
proof = fields.Dict(metadata={"description": ""}) | ||
|
||
|
||
class TokenRequestSchema(OpenAPISchema): | ||
"""Request schema for the /token endpoint.""" | ||
|
||
client_id = fields.Str( | ||
required=True, | ||
metadata={"description": "The client ID for the token request.", "example": ""}, | ||
) | ||
|
||
|
||
class GetTokenSchema(OpenAPISchema): | ||
"""Schema for ...""" | ||
|
||
grant_type = fields.Str(required=True, metadata={"description": "", "example": ""}) | ||
|
||
pre_authorized_code = fields.Str( | ||
required=True, metadata={"description": "", "example": ""} | ||
) | ||
|
||
|
||
@docs(tags=["oid4vci"], summary="Get credential issuer metadata") | ||
@querystring_schema(TokenRequestSchema()) | ||
async def oid_cred_issuer(request: web.Request): | ||
"""Credential issuer metadata endpoint.""" | ||
profile = request["context"].profile | ||
public_url = profile.context.settings.get("public_url") # TODO: check | ||
|
||
# Wallet query to retrieve credential definitions | ||
tag_filter = {"type": {"$in": ["sd_jwt", "jwt_vc_json"]}} | ||
async with profile.session() as session: | ||
credentials_supported = await OID4VCICredentialSupported.query( | ||
session, tag_filter | ||
) | ||
|
||
metadata = { | ||
"credential_issuer": f"{public_url}/issuer", | ||
"credential_endpoint": f"{public_url}/credential", | ||
"credentials_supported": [cred.serialize() for cred in credentials_supported], | ||
"authorization_server": f"{public_url}/auth-server", | ||
"batch_credential_endpoint": f"{public_url}/batch_credential", | ||
} | ||
|
||
return web.json_response(metadata) | ||
|
||
|
||
async def check_token(profile: Profile, auth_header: Optional[str] = None): | ||
"""Validate the OID4VCI token.""" | ||
if not auth_header: | ||
raise web.HTTPUnauthorized() # no authentication | ||
|
||
scheme, cred = auth_header.split(" ") | ||
if scheme.lower() != "bearer" or (): | ||
raise web.HTTPUnauthorized() # Invalid authentication credentials | ||
|
||
jwt_header = pyjwt.get_unverified_header(cred) | ||
if "did:key:" not in jwt_header["kid"]: | ||
raise web.HTTPUnauthorized() # Invalid authentication credentials | ||
|
||
result = await jwt_verify(profile, cred) | ||
if not result.valid: | ||
raise web.HTTPUnauthorized() # Invalid credentials | ||
|
||
|
||
@docs(tags=["oid4vci"], summary="Issue a credential") | ||
@request_schema(IssueCredentialRequestSchema()) | ||
async def issue_cred(request: web.Request): | ||
"""Credential issuance endpoint.""" | ||
profile = request["context"].profile | ||
await check_token(profile, request.headers.get("Authorization")) | ||
|
||
|
||
@docs(tags=["oid4vci"], summary="Get credential issuance token") | ||
@querystring_schema(TokenRequestSchema()) | ||
async def get_token(request: web.Request): | ||
"""Token endpoint to exchange pre_authorized codes for access tokens.""" | ||
|
||
|
||
async def register(app: web.Application): | ||
"""Register routes.""" | ||
app.add_routes( | ||
[ | ||
web.get( | ||
"/.well-known/openid-credential-issuer", | ||
oid_cred_issuer, | ||
allow_head=False, | ||
), | ||
# web.get("/.well-known/", self., allow_head=False), | ||
# web.get("/.well-known/", self., allow_head=False), | ||
web.post("/credential", issue_cred), | ||
web.post("/token", get_token), | ||
] | ||
) |