-
Notifications
You must be signed in to change notification settings - Fork 408
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(data_masking): add new sensitive data masking utility (#2197)
Co-authored-by: Roger Zhang <[email protected]> Co-authored-by: Leandro Damascena <[email protected]> Co-authored-by: Roy Assis <[email protected]> Co-authored-by: Ruben Fonseca <[email protected]> Co-authored-by: Roger Zhang <[email protected]> Co-authored-by: aal80 <[email protected]> Co-authored-by: Seshu Brahma <[email protected]> Co-authored-by: Heitor Lessa <[email protected]>
- Loading branch information
1 parent
e441c0b
commit 35fb001
Showing
38 changed files
with
2,435 additions
and
160 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from aws_lambda_powertools.utilities.data_masking.base import DataMasking | ||
|
||
__all__ = [ | ||
"DataMasking", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
import json | ||
from typing import Optional, Union | ||
|
||
from aws_lambda_powertools.utilities.data_masking.provider import BaseProvider | ||
|
||
|
||
class DataMasking: | ||
""" | ||
A utility class for masking sensitive data within various data types. | ||
This class provides methods for masking sensitive information, such as personal | ||
identifiers or confidential data, within different data types such as strings, | ||
dictionaries, lists, and more. It helps protect sensitive information while | ||
preserving the structure of the original data. | ||
Usage: | ||
Instantiate an object of this class and use its methods to mask sensitive data | ||
based on the data type. Supported data types include strings, dictionaries, | ||
and more. | ||
Example: | ||
``` | ||
from aws_lambda_powertools.utilities.data_masking.base import DataMasking | ||
def lambda_handler(event, context): | ||
masker = DataMasking() | ||
data = { | ||
"project": "powertools", | ||
"sensitive": "xxxxxxxxxx" | ||
} | ||
masked = masker.mask(data,fields=["sensitive"]) | ||
return masked | ||
``` | ||
""" | ||
|
||
def __init__(self, provider: Optional[BaseProvider] = None): | ||
self.provider = provider or BaseProvider() | ||
|
||
def encrypt(self, data, fields=None, **provider_options): | ||
return self._apply_action(data, fields, self.provider.encrypt, **provider_options) | ||
|
||
def decrypt(self, data, fields=None, **provider_options): | ||
return self._apply_action(data, fields, self.provider.decrypt, **provider_options) | ||
|
||
def mask(self, data, fields=None, **provider_options): | ||
return self._apply_action(data, fields, self.provider.mask, **provider_options) | ||
|
||
def _apply_action(self, data, fields, action, **provider_options): | ||
""" | ||
Helper method to determine whether to apply a given action to the entire input data | ||
or to specific fields if the 'fields' argument is specified. | ||
Parameters | ||
---------- | ||
data : any | ||
The input data to process. | ||
fields : Optional[List[any]] = None | ||
A list of fields to apply the action to. If 'None', the action is applied to the entire 'data'. | ||
action : Callable | ||
The action to apply to the data. It should be a callable that performs an operation on the data | ||
and returns the modified value. | ||
Returns | ||
------- | ||
any | ||
The modified data after applying the action. | ||
""" | ||
|
||
if fields is not None: | ||
return self._apply_action_to_fields(data, fields, action, **provider_options) | ||
else: | ||
return action(data, **provider_options) | ||
|
||
def _apply_action_to_fields( | ||
self, | ||
data: Union[dict, str], | ||
fields: list, | ||
action, | ||
**provider_options, | ||
) -> Union[dict, str]: | ||
""" | ||
This method takes the input data, which can be either a dictionary or a JSON string, | ||
and applies a mask, an encryption, or a decryption to the specified fields. | ||
Parameters | ||
---------- | ||
data : Union[dict, str]) | ||
The input data to process. It can be either a dictionary or a JSON string. | ||
fields : List | ||
A list of fields to apply the action to. Each field can be specified as a string or | ||
a list of strings representing nested keys in the dictionary. | ||
action : Callable | ||
The action to apply to the fields. It should be a callable that takes the current | ||
value of the field as the first argument and any additional arguments that might be required | ||
for the action. It performs an operation on the current value using the provided arguments and | ||
returns the modified value. | ||
**provider_options: | ||
Additional keyword arguments to pass to the 'action' function. | ||
Returns | ||
------- | ||
dict | ||
The modified dictionary after applying the action to the | ||
specified fields. | ||
Raises | ||
------- | ||
ValueError | ||
If 'fields' parameter is None. | ||
TypeError | ||
If the 'data' parameter is not a traversable type | ||
Example | ||
------- | ||
```python | ||
>>> data = {'a': {'b': {'c': 1}}, 'x': {'y': 2}} | ||
>>> fields = ['a.b.c', 'a.x.y'] | ||
# The function will transform the value at 'a.b.c' (1) and 'a.x.y' (2) | ||
# and store the result as: | ||
new_dict = {'a': {'b': {'c': 'transformed_value'}}, 'x': {'y': 'transformed_value'}} | ||
``` | ||
""" | ||
|
||
if fields is None: | ||
raise ValueError("No fields specified.") | ||
|
||
if isinstance(data, str): | ||
# Parse JSON string as dictionary | ||
my_dict_parsed = json.loads(data) | ||
elif isinstance(data, dict): | ||
# In case their data has keys that are not strings (i.e. ints), convert it all into a JSON string | ||
my_dict_parsed = json.dumps(data) | ||
# Turn back into dict so can parse it | ||
my_dict_parsed = json.loads(my_dict_parsed) | ||
else: | ||
raise TypeError( | ||
f"Unsupported data type for 'data' parameter. Expected a traversable type, but got {type(data)}.", | ||
) | ||
|
||
# For example: ['a.b.c'] in ['a.b.c', 'a.x.y'] | ||
for nested_key in fields: | ||
# Prevent overriding loop variable | ||
curr_nested_key = nested_key | ||
|
||
# If the nested_key is not a string, convert it to a string representation | ||
if not isinstance(curr_nested_key, str): | ||
curr_nested_key = json.dumps(curr_nested_key) | ||
|
||
# Split the nested key string into a list of nested keys | ||
# ['a.b.c'] -> ['a', 'b', 'c'] | ||
keys = curr_nested_key.split(".") | ||
|
||
# Initialize a current dictionary to the root dictionary | ||
curr_dict = my_dict_parsed | ||
|
||
# Traverse the dictionary hierarchy by iterating through the list of nested keys | ||
for key in keys[:-1]: | ||
curr_dict = curr_dict[key] | ||
|
||
# Retrieve the final value of the nested field | ||
valtochange = curr_dict[(keys[-1])] | ||
|
||
# Apply the specified 'action' to the target value | ||
curr_dict[keys[-1]] = action(valtochange, **provider_options) | ||
|
||
return my_dict_parsed |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
DATA_MASKING_STRING: str = "*****" | ||
CACHE_CAPACITY: int = 100 | ||
MAX_CACHE_AGE_SECONDS: float = 300.0 | ||
MAX_MESSAGES_ENCRYPTED: int = 200 | ||
# NOTE: You can also set max messages/bytes per data key |
5 changes: 5 additions & 0 deletions
5
aws_lambda_powertools/utilities/data_masking/provider/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from aws_lambda_powertools.utilities.data_masking.provider.base import BaseProvider | ||
|
||
__all__ = [ | ||
"BaseProvider", | ||
] |
34 changes: 34 additions & 0 deletions
34
aws_lambda_powertools/utilities/data_masking/provider/base.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import json | ||
from typing import Any | ||
|
||
from aws_lambda_powertools.utilities.data_masking.constants import DATA_MASKING_STRING | ||
|
||
|
||
class BaseProvider: | ||
""" | ||
When you try to create an instance of a subclass that does not implement the encrypt method, | ||
you will get a NotImplementedError with a message that says the method is not implemented: | ||
""" | ||
|
||
def __init__(self, json_serializer=None, json_deserializer=None) -> None: | ||
self.json_serializer = json_serializer or self.default_json_serializer | ||
self.json_deserializer = json_deserializer or self.default_json_deserializer | ||
|
||
def default_json_serializer(self, data): | ||
return json.dumps(data).encode("utf-8") | ||
|
||
def default_json_deserializer(self, data): | ||
return json.loads(data.decode("utf-8")) | ||
|
||
def encrypt(self, data) -> str: | ||
raise NotImplementedError("Subclasses must implement encrypt()") | ||
|
||
def decrypt(self, data) -> Any: | ||
raise NotImplementedError("Subclasses must implement decrypt()") | ||
|
||
def mask(self, data) -> Any: | ||
if isinstance(data, (str, dict, bytes)): | ||
return DATA_MASKING_STRING | ||
elif isinstance(data, (list, tuple, set)): | ||
return type(data)([DATA_MASKING_STRING] * len(data)) | ||
return DATA_MASKING_STRING |
5 changes: 5 additions & 0 deletions
5
aws_lambda_powertools/utilities/data_masking/provider/kms/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from aws_lambda_powertools.utilities.data_masking.provider.kms.aws_encryption_sdk import AwsEncryptionSdkProvider | ||
|
||
__all__ = [ | ||
"AwsEncryptionSdkProvider", | ||
] |
Oops, something went wrong.