Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Idempotency): add feature for manipulating idempotent responses #4037

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Utility for adding idempotency to lambda functions
"""

from aws_lambda_powertools.utilities.idempotency.hook import (
IdempotentHookFunction,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
Expand All @@ -17,4 +20,5 @@
"idempotent",
"idempotent_function",
"IdempotencyConfig",
"IdempotentHookFunction",
)
14 changes: 12 additions & 2 deletions aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple

from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.config import (
IdempotencyConfig,
)
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
Expand Down Expand Up @@ -227,7 +229,15 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Any]:
)
response_dict: Optional[dict] = data_record.response_json_as_dict()
if response_dict is not None:
return self.output_serializer.from_dict(response_dict)
serialized_response = self.output_serializer.from_dict(response_dict)
if self.config.response_hook is not None:
logger.debug("Response hook configured, invoking function")
return self.config.response_hook(
serialized_response,
data_record,
)
return serialized_response

return None

def _get_function_response(self):
Expand Down
5 changes: 5 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.idempotency import IdempotentHookFunction
from aws_lambda_powertools.utilities.typing import LambdaContext


Expand All @@ -15,6 +16,7 @@ def __init__(
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
response_hook: Optional[IdempotentHookFunction] = None,
):
"""
Initialize the base persistence layer
Expand All @@ -37,6 +39,8 @@ def __init__(
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
response_hook: IdempotentHookFunction, optional
Hook function to be called when an idempotent response is returned from the idempotent store.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
Expand All @@ -47,6 +51,7 @@ def __init__(
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context
self.response_hook: Optional[IdempotentHookFunction] = response_hook

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
Expand Down
13 changes: 13 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Any

from aws_lambda_powertools.shared.types import Protocol
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord


class IdempotentHookFunction(Protocol):
"""
The IdempotentHookFunction.
This class defines the calling signature for IdempotentHookFunction callbacks.
"""

def __call__(self, response: Any, idempotent_data: DataRecord) -> Any: ...
87 changes: 76 additions & 11 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ We currently support Amazon DynamoDB and Redis as a storage layer. The following
If you're not [changing the default configuration for the DynamoDB persistence layer](#dynamodbpersistencelayer), this is the expected default configuration:

| Configuration | Value | Notes |
| ------------------ | ------------ | ----------------------------------------------------------------------------------- |
| Partition key | `id` |
| ------------------ | ------------ |-------------------------------------------------------------------------------------|
| Partition key | `id` | |
| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console |

???+ tip "Tip: You can share a single state table for all functions"
Expand Down Expand Up @@ -454,6 +454,40 @@ sequenceDiagram
<i>Idempotent successful request cached</i>
</center>

#### Successful request with response_hook configured

<center>
```mermaid
sequenceDiagram
participant Client
participant Lambda
participant Response hook
participant Persistence Layer
alt initial request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
activate Persistence Layer
Note over Lambda,Persistence Layer: Set record status to INPROGRESS. <br> Prevents concurrent invocations <br> with the same payload
Lambda-->>Lambda: Call your function
Lambda->>Persistence Layer: Update record with result
deactivate Persistence Layer
Persistence Layer-->>Persistence Layer: Update record
Note over Lambda,Persistence Layer: Set record status to COMPLETE. <br> New invocations with the same payload <br> now return the same result
Lambda-->>Client: Response sent to client
else retried request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload)
activate Persistence Layer
Persistence Layer-->>Response hook: Already exists in persistence layer.
deactivate Persistence Layer
Note over Response hook,Persistence Layer: Record status is COMPLETE and not expired
Response hook->>Lambda: Response hook invoked
Lambda-->>Client: Manipulated idempotent response sent to client
end
```
<i>Successful idempotent request with a response hook</i>
</center>

#### Expired idempotency records

<center>
Expand Down Expand Up @@ -699,15 +733,16 @@ For advanced configurations, such as setting up SSL certificates or customizing

Idempotent decorator can be further configured with **`IdempotencyConfig`** as seen in the previous example. These are the available options for further configuration

| Parameter | Default | Description |
| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} |
| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload |
| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request |
| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired |
| **use_local_cache** | `False` | Whether to locally cache idempotency results |
| **local_cache_max_items** | 256 | Max number of items to store in local cache |
| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. |
| Parameter | Default | Description |
|---------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](./jmespath_functions.md#built-in-jmespath-functions){target="_blank"} |
| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload |
| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request |
| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired |
| **use_local_cache** | `False` | Whether to locally cache idempotency results |
| **local_cache_max_items** | 256 | Max number of items to store in local cache |
| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html){target="_blank" rel="nofollow"} in the standard library. |
| **response_hook** | `None` | Function to use for processing the stored Idempotent response. This function hook is called when an existing idempotent response is found. See [Manipulating The Idempotent Response](idempotency.md#manipulating-the-idempotent-response) |

### Handling concurrent executions with the same payload

Expand Down Expand Up @@ -909,6 +944,36 @@ You can create your own persistent store from scratch by inheriting the `BasePer

For example, the `_put_record` method needs to raise an exception if a non-expired record already exists in the data store with a matching key.

### Manipulating the Idempotent Response

You can set up a `response_hook` in the `IdempotentConfig` class to manipulate the returned data when an operation is idempotent. The hook function will be called with the current deserialized response object and the Idempotency record.

=== "Using an Idempotent Response Hook"

```python hl_lines="18 20 23 32"
--8<-- "examples/idempotency/src/working_with_response_hook.py"
```

=== "Sample event"

```json
--8<-- "examples/idempotency/src/working_with_response_hook_payload.json"
```

???+ info "Info: Using custom de-serialization?"

The response_hook is called after the custom de-serialization so the payload you process will be the de-serialized version.

#### Being a good citizen

When using response hooks to manipulate returned data from idempotent operations, it's important to follow best practices to avoid introducing complexity or issues. Keep these guidelines in mind:

1. **Response hook works exclusively when operations are idempotent.** The hook will not be called when an operation is not idempotent, or when the idempotent logic fails.

2. **Catch and Handle Exceptions.** Your response hook code should catch and handle any exceptions that may arise from your logic. Unhandled exceptions will cause the Lambda function to fail unexpectedly.

3. **Keep Hook Logic Simple** Response hooks should consist of minimal and straightforward logic for manipulating response data. Avoid complex conditional branching and aim for hooks that are easy to reason about.

## Compatibility with other utilities

### Batch
Expand Down
56 changes: 56 additions & 0 deletions examples/idempotency/src/working_with_response_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import datetime
import uuid
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer,
IdempotencyConfig,
idempotent_function,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
DataRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


def my_response_hook(response: Dict, idempotent_data: DataRecord) -> Dict:
# Return inserted Header data into the Idempotent Response
response["x-idempotent-key"] = idempotent_data.idempotency_key

# expiry_timestamp could be None so include if set
expiry_timestamp = idempotent_data.expiry_timestamp
if expiry_timestamp:
expiry_time = datetime.datetime.fromtimestamp(int(expiry_timestamp))
response["x-idempotent-expiration"] = expiry_time.isoformat()

# Must return the response here
return response


dynamodb = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(response_hook=my_response_hook)


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: dict) -> dict:
# create the order_id
order_id = str(uuid.uuid4())

# create your logic to save the order
# append the order_id created
order["order_id"] = order_id

# return the order
return {"order": order}


def lambda_handler(event: dict, context: LambdaContext):
config.register_lambda_context(context) # see Lambda timeouts section
try:
logger.info(f"Processing order id {event.get('order_id')}")
return process_order(order=event.get("order"))
except Exception as err:
return {"status_code": 400, "error": f"Error processing {str(err)}"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"order" : {
"user_id": "xyz",
"product_id": "123456789",
"quantity": 2,
"value": 30
}
}
Loading