diff --git a/aws_lambda_powertools/shared/constants.py b/aws_lambda_powertools/shared/constants.py index bb8164d1d37..babf834e0b4 100644 --- a/aws_lambda_powertools/shared/constants.py +++ b/aws_lambda_powertools/shared/constants.py @@ -61,7 +61,7 @@ # JSON constants PRETTY_INDENT: int = 4 -COMPACT_INDENT = None +COMPACT_INDENT: None = None # Idempotency constants IDEMPOTENCY_DISABLED_ENV: str = "POWERTOOLS_IDEMPOTENCY_DISABLED" diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py index 998d25384d4..48d3c96c84c 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py @@ -15,18 +15,18 @@ class APIGatewayEventAuthorizer(DictWrapper): @property - def claims(self) -> Optional[Dict[str, Any]]: - return self.get("claims") + def claims(self) -> Dict[str, Any]: + return self.get("claims") or {} # key might exist but can be `null` @property - def scopes(self) -> Optional[List[str]]: - return self.get("scopes") + def scopes(self) -> List[str]: + return self.get("scopes") or [] # key might exist but can be `null` @property - def principal_id(self) -> Optional[str]: + def principal_id(self) -> str: """The principal user identification associated with the token sent by the client and returned from an API Gateway Lambda authorizer (formerly known as a custom authorizer)""" - return self.get("principalId") + return self.get("principalId") or "" # key might exist but can be `null` @property def integration_latency(self) -> Optional[int]: @@ -91,7 +91,8 @@ def route_key(self) -> Optional[str]: @property def authorizer(self) -> APIGatewayEventAuthorizer: - return APIGatewayEventAuthorizer(self._data["requestContext"]["authorizer"]) + authz_data = self._data.get("requestContext", {}).get("authorizer", {}) + return APIGatewayEventAuthorizer(authz_data) class APIGatewayProxyEvent(BaseProxyEvent): @@ -112,11 +113,11 @@ def resource(self) -> str: @property def multi_value_headers(self) -> Dict[str, List[str]]: - return self.get("multiValueHeaders") or {} + return self.get("multiValueHeaders") or {} # key might exist but can be `null` @property def multi_value_query_string_parameters(self) -> Dict[str, List[str]]: - return self.get("multiValueQueryStringParameters") or {} + return self.get("multiValueQueryStringParameters") or {} # key might exist but can be `null` @property def resolved_query_string_parameters(self) -> Dict[str, List[str]]: @@ -154,72 +155,72 @@ def header_serializer(self) -> BaseHeadersSerializer: class RequestContextV2AuthorizerIam(DictWrapper): @property - def access_key(self) -> Optional[str]: + def access_key(self) -> str: """The IAM user access key associated with the request.""" - return self.get("accessKey") + return self.get("accessKey") or "" # key might exist but can be `null` @property - def account_id(self) -> Optional[str]: + def account_id(self) -> str: """The AWS account ID associated with the request.""" - return self.get("accountId") + return self.get("accountId") or "" # key might exist but can be `null` @property - def caller_id(self) -> Optional[str]: + def caller_id(self) -> str: """The principal identifier of the caller making the request.""" - return self.get("callerId") + return self.get("callerId") or "" # key might exist but can be `null` def _cognito_identity(self) -> Dict: - return self.get("cognitoIdentity", {}) or {} # not available in FunctionURL + return self.get("cognitoIdentity") or {} # not available in FunctionURL; key might exist but can be `null` @property - def cognito_amr(self) -> Optional[List[str]]: + def cognito_amr(self) -> List[str]: """This represents how the user was authenticated. AMR stands for Authentication Methods References as per the openid spec""" - return self._cognito_identity().get("amr") + return self._cognito_identity().get("amr", []) @property - def cognito_identity_id(self) -> Optional[str]: + def cognito_identity_id(self) -> str: """The Amazon Cognito identity ID of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self._cognito_identity().get("identityId") + return self._cognito_identity().get("identityId", "") @property - def cognito_identity_pool_id(self) -> Optional[str]: + def cognito_identity_pool_id(self) -> str: """The Amazon Cognito identity pool ID of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self._cognito_identity().get("identityPoolId") + return self._cognito_identity().get("identityPoolId") or "" # key might exist but can be `null` @property - def principal_org_id(self) -> Optional[str]: + def principal_org_id(self) -> str: """The AWS organization ID.""" - return self.get("principalOrgId") + return self.get("principalOrgId") or "" # key might exist but can be `null` @property - def user_arn(self) -> Optional[str]: + def user_arn(self) -> str: """The Amazon Resource Name (ARN) of the effective user identified after authentication.""" - return self.get("userArn") + return self.get("userArn") or "" # key might exist but can be `null` @property - def user_id(self) -> Optional[str]: + def user_id(self) -> str: """The IAM user ID of the effective user identified after authentication.""" - return self.get("userId") + return self.get("userId") or "" # key might exist but can be `null` class RequestContextV2Authorizer(DictWrapper): @property - def jwt_claim(self) -> Optional[Dict[str, Any]]: - jwt = self.get("jwt") or {} # not available in FunctionURL - return jwt.get("claims") + def jwt_claim(self) -> Dict[str, Any]: + jwt = self.get("jwt") or {} # not available in FunctionURL; key might exist but can be `null` + return jwt.get("claims") or {} # key might exist but can be `null` @property - def jwt_scopes(self) -> Optional[List[str]]: - jwt = self.get("jwt") or {} # not available in FunctionURL - return jwt.get("scopes") + def jwt_scopes(self) -> List[str]: + jwt = self.get("jwt") or {} # not available in FunctionURL; key might exist but can be `null` + return jwt.get("scopes", []) @property - def get_lambda(self) -> Optional[Dict[str, Any]]: + def get_lambda(self) -> Dict[str, Any]: """Lambda authorization context details""" - return self.get("lambda") + return self.get("lambda") or {} # key might exist but can be `null` def get_context(self) -> Dict[str, Any]: """Retrieve the authorization context details injected by a Lambda Authorizer. @@ -238,20 +239,20 @@ def get_context(self) -> Dict[str, Any]: Dict[str, Any] A dictionary containing Lambda authorization context details. """ - return self.get("lambda", {}) or {} + return self.get_lambda @property - def iam(self) -> Optional[RequestContextV2AuthorizerIam]: + def iam(self) -> RequestContextV2AuthorizerIam: """IAM authorization details used for making the request.""" - iam = self.get("iam") - return None if iam is None else RequestContextV2AuthorizerIam(iam) + iam = self.get("iam") or {} # key might exist but can be `null` + return RequestContextV2AuthorizerIam(iam) class RequestContextV2(BaseRequestContextV2): @property - def authorizer(self) -> Optional[RequestContextV2Authorizer]: - authorizer = self["requestContext"].get("authorizer") - return None if authorizer is None else RequestContextV2Authorizer(authorizer) + def authorizer(self) -> RequestContextV2Authorizer: + ctx = self.get("requestContext") or {} # key might exist but can be `null` + return RequestContextV2Authorizer(ctx.get("authorizer", {})) class APIGatewayProxyEventV2(BaseProxyEvent): diff --git a/tests/unit/data_classes/test_api_gateway_proxy_event.py b/tests/unit/data_classes/test_api_gateway_proxy_event.py index 7d464372135..d86e4b5e19b 100644 --- a/tests/unit/data_classes/test_api_gateway_proxy_event.py +++ b/tests/unit/data_classes/test_api_gateway_proxy_event.py @@ -89,8 +89,8 @@ def test_api_gateway_proxy_event(): assert request_context.api_id == request_context_raw["apiId"] authorizer = request_context.authorizer - assert authorizer.claims is None - assert authorizer.scopes is None + assert authorizer.claims == {} + assert authorizer.scopes == [] assert request_context.domain_name == request_context_raw["domainName"] assert request_context.domain_prefix == request_context_raw["domainPrefix"] @@ -144,8 +144,8 @@ def test_api_gateway_proxy_event_with_principal_id(): request_context = parsed_event.request_context authorizer = request_context.authorizer - assert authorizer.claims is None - assert authorizer.scopes is None + assert authorizer.claims == {} + assert authorizer.scopes == [] assert authorizer.principal_id == raw_event["requestContext"]["authorizer"]["principalId"] assert authorizer.integration_latency == raw_event["requestContext"]["authorizer"]["integrationLatency"] assert authorizer.get("integrationStatus", "failed") == "failed" diff --git a/tests/unit/data_classes/test_lambda_function_url.py b/tests/unit/data_classes/test_lambda_function_url.py index b7e8003d6c7..f8ce71b1543 100644 --- a/tests/unit/data_classes/test_lambda_function_url.py +++ b/tests/unit/data_classes/test_lambda_function_url.py @@ -1,4 +1,5 @@ from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent +from aws_lambda_powertools.utilities.data_classes.api_gateway_proxy_event import RequestContextV2Authorizer from tests.functional.utils import load_event @@ -47,7 +48,7 @@ def test_lambda_function_url_event(): assert http.source_ip == http_raw["sourceIp"] assert http.user_agent == http_raw["userAgent"] - assert request_context.authorizer is None + assert isinstance(request_context.authorizer, RequestContextV2Authorizer) def test_lambda_function_url_event_iam(): @@ -102,9 +103,9 @@ def test_lambda_function_url_event_iam(): authorizer = request_context.authorizer assert authorizer is not None - assert authorizer.jwt_claim is None - assert authorizer.jwt_scopes is None - assert authorizer.get_lambda is None + assert authorizer.jwt_claim == {} + assert authorizer.jwt_scopes == [] + assert authorizer.get_lambda == {} iam = authorizer.iam iam_raw = raw_event["requestContext"]["authorizer"]["iam"] @@ -112,9 +113,9 @@ def test_lambda_function_url_event_iam(): assert iam.access_key == iam_raw["accessKey"] assert iam.account_id == iam_raw["accountId"] assert iam.caller_id == iam_raw["callerId"] - assert iam.cognito_amr is None - assert iam.cognito_identity_id is None - assert iam.cognito_identity_pool_id is None - assert iam.principal_org_id is None + assert iam.cognito_amr == [] + assert iam.cognito_identity_id == "" + assert iam.cognito_identity_pool_id == "" + assert iam.principal_org_id == "" assert iam.user_id == iam_raw["userId"] assert iam.user_arn == iam_raw["userArn"]