Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(feat-flags): new simple feature toggles rule engine #494

Merged
merged 12 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions aws_lambda_powertools/utilities/feature_toggles/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Advanced feature toggles utility
"""
from .appconfig_fetcher import AppConfigFetcher
from .configuration_store import ConfigurationStore
from .exceptions import ConfigurationException
from .schema import ACTION, SchemaValidator
from .schema_fetcher import SchemaFetcher

__all__ = [
"ConfigurationException",
"ConfigurationStore",
"ACTION",
"SchemaValidator",
"AppConfigFetcher",
"SchemaFetcher",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
from typing import Any, Dict, Optional

from botocore.config import Config

from aws_lambda_powertools.utilities.parameters import AppConfigProvider, GetParameterError, TransformParameterError

from .exceptions import ConfigurationException
from .schema_fetcher import SchemaFetcher

logger = logging.getLogger(__name__)


TRANSFORM_TYPE = "json"


class AppConfigFetcher(SchemaFetcher):
def __init__(
self,
environment: str,
service: str,
configuration_name: str,
cache_seconds: int,
config: Optional[Config] = None,
):
"""This class fetches JSON schemas from AWS AppConfig

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to use the mypy doc strings

environment (str): what appconfig environment to use 'dev/test' etc.
service (str): what service name to use from the supplied environment
configuration_name (str): what configuration to take from the environment & service combination
cache_seconds (int): cache expiration time, how often to call AppConfig to fetch latest configuration
config (Optional[Config]): boto3 client configuration
"""
super().__init__(configuration_name, cache_seconds)
self._logger = logger
self._conf_store = AppConfigProvider(environment=environment, application=service, config=config)

def get_json_configuration(self) -> Dict[str, Any]:
"""Get configuration string from AWs AppConfig and return the parsed JSON dictionary

Raises:
ConfigurationException: Any validation error or appconfig error that can occur

Returns:
Dict[str, Any]: parsed JSON dictionary
"""
try:
return self._conf_store.get(
name=self.configuration_name,
transform=TRANSFORM_TYPE,
max_age=self._cache_seconds,
) # parse result conf as JSON, keep in cache for self.max_age seconds
except (GetParameterError, TransformParameterError) as exc:
error_str = f"unable to get AWS AppConfig configuration file, exception={str(exc)}"
self._logger.error(error_str)
raise ConfigurationException(error_str)
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import logging
from typing import Any, Dict, List, Optional

from . import schema
from .exceptions import ConfigurationException
from .schema_fetcher import SchemaFetcher

logger = logging.getLogger(__name__)


class ConfigurationStore:
def __init__(self, schema_fetcher: SchemaFetcher):
"""constructor

Args:
schema_fetcher (SchemaFetcher): A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc.
"""
self._logger = logger
self._schema_fetcher = schema_fetcher
self._schema_validator = schema.SchemaValidator(self._logger)

def _match_by_action(self, action: str, condition_value: Any, context_value: Any) -> bool:
if not context_value:
return False
mapping_by_action = {
schema.ACTION.EQUALS.value: lambda a, b: a == b,
schema.ACTION.STARTSWITH.value: lambda a, b: a.startswith(b),
schema.ACTION.ENDSWITH.value: lambda a, b: a.endswith(b),
schema.ACTION.CONTAINS.value: lambda a, b: a in b,
}

try:
func = mapping_by_action.get(action, lambda a, b: False)
return func(context_value, condition_value)
except Exception as exc:
self._logger.error(f"caught exception while matching action, action={action}, exception={str(exc)}")
return False

def _is_rule_matched(self, feature_name: str, rule: Dict[str, Any], rules_context: Dict[str, Any]) -> bool:
rule_name = rule.get(schema.RULE_NAME_KEY, "")
rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE)
conditions: Dict[str, str] = rule.get(schema.CONDITIONS_KEY)

for condition in conditions:
context_value = rules_context.get(condition.get(schema.CONDITION_KEY))
if not self._match_by_action(
condition.get(schema.CONDITION_ACTION),
condition.get(schema.CONDITION_VALUE),
context_value,
):
logger.debug(
f"rule did not match action, rule_name={rule_name}, rule_default_value={rule_default_value}, feature_name={feature_name}, context_value={str(context_value)}" # noqa: E501
)
# context doesn't match condition
return False
# if we got here, all conditions match
logger.debug(
f"rule matched, rule_name={rule_name}, rule_default_value={rule_default_value}, feature_name={feature_name}" # noqa: E501
)
return True

def _handle_rules(
pcolazurdo marked this conversation as resolved.
Show resolved Hide resolved
pcolazurdo marked this conversation as resolved.
Show resolved Hide resolved
self,
*,
feature_name: str,
rules_context: Dict[str, Any],
feature_default_value: bool,
rules: List[Dict[str, Any]],
) -> bool:
for rule in rules:
rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE)
if self._is_rule_matched(feature_name, rule, rules_context):
return rule_default_value
pcolazurdo marked this conversation as resolved.
Show resolved Hide resolved
# no rule matched, return default value of feature
logger.debug(
f"no rule matched, returning default value of feature, feature_default_value={feature_default_value}, feature_name={feature_name}" # noqa: E501
)
return feature_default_value

def get_configuration(self) -> Dict[str, Any]:
"""Get configuration string from AWs AppConfig and returned the parsed JSON dictionary

Raises:
ConfigurationException: Any validation error or appconfig error that can occur

Returns:
Dict[str, Any]: parsed JSON dictionary
"""
schema: Dict[
str, Any
] = (
self._schema_fetcher.get_json_configuration()
) # parse result conf as JSON, keep in cache for self.max_age seconds
# validate schema
self._schema_validator.validate_json_schema(schema)
return schema

def get_feature_toggle(
pcolazurdo marked this conversation as resolved.
Show resolved Hide resolved
self, *, feature_name: str, rules_context: Optional[Dict[str, Any]] = None, value_if_missing: bool
) -> bool:
"""get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.
see below for explanation.

Args:
feature_name (str): feature name that you wish to fetch
rules_context (Optional[Dict[str, Any]]): dict of attributes that you would like to match the rules
against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
value_if_missing (bool): this will be the returned value in case the feature toggle doesn't exist in
the schema or there has been an error while fetching the
configuration from appconfig

Returns:
bool: calculated feature toggle value. several possibilities:
1. if the feature doesn't appear in the schema or there has been an error fetching the
configuration -> error/warning log would appear and value_if_missing is returned
2. feature exists and has no rules or no rules have matched -> return feature_default_value of
the defined feature
3. feature exists and a rule matches -> rule_default_value of rule is returned
"""
if rules_context is None:
rules_context = {}

try:
toggles_dict: Dict[str, Any] = self.get_configuration()
except ConfigurationException:
logger.error("unable to get feature toggles JSON, returning provided value_if_missing value") # noqa: E501
return value_if_missing

feature: Dict[str, Dict] = toggles_dict.get(schema.FEATURES_KEY, {}).get(feature_name, None)
if feature is None:
logger.warning(
f"feature does not appear in configuration, using provided value_if_missing, feature_name={feature_name}, value_if_missing={value_if_missing}" # noqa: E501
)
return value_if_missing

rules_list = feature.get(schema.RULES_KEY)
feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY)
if not rules_list:
# not rules but has a value
logger.debug(
f"no rules found, returning feature default value, feature_name={feature_name}, default_value={feature_default_value}" # noqa: E501
)
return feature_default_value
# look for first rule match
logger.debug(
f"looking for rule match, feature_name={feature_name}, feature_default_value={feature_default_value}"
) # noqa: E501
return self._handle_rules(
feature_name=feature_name,
rules_context=rules_context,
feature_default_value=feature_default_value,
rules=rules_list,
)

def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, Any]] = None) -> List[str]:
"""Get all enabled feature toggles while also taking into account rule_context (when a feature has defined rules)

Args:
rules_context (Optional[Dict[str, Any]]): dict of attributes that you would like to match the rules
against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.

Returns:
List[str]: a list of all features name that are enabled by also taking into account
rule_context (when a feature has defined rules)
"""
if rules_context is None:
rules_context = {}
try:
toggles_dict: Dict[str, Any] = self.get_configuration()
except ConfigurationException:
logger.error("unable to get feature toggles JSON") # noqa: E501
return []
ret_list = []
features: Dict[str, Any] = toggles_dict.get(schema.FEATURES_KEY, {})
for feature_name, feature_dict_def in features.items():
rules_list = feature_dict_def.get(schema.RULES_KEY, [])
feature_default_value = feature_dict_def.get(schema.FEATURE_DEFAULT_VAL_KEY)
if feature_default_value and not rules_list:
self._logger.debug(
f"feature is enabled by default and has no defined rules, feature_name={feature_name}"
)
ret_list.append(feature_name)
elif self._handle_rules(
feature_name=feature_name,
rules_context=rules_context,
feature_default_value=feature_default_value,
rules=rules_list,
):
self._logger.debug(f"feature's calculated value is True, feature_name={feature_name}")
ret_list.append(feature_name)
return ret_list
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ConfigurationException(Exception):
"""When a a configuration store raises an exception on config retrieval or parsing"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""When a a configuration store raises an exception on config retrieval or parsing"""
"""When a configuration store raises an exception on config retrieval or parsing"""

83 changes: 83 additions & 0 deletions aws_lambda_powertools/utilities/feature_toggles/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from enum import Enum
from typing import Any, Dict

from .exceptions import ConfigurationException

FEATURES_KEY = "features"
RULES_KEY = "rules"
FEATURE_DEFAULT_VAL_KEY = "feature_default_value"
CONDITIONS_KEY = "conditions"
RULE_NAME_KEY = "rule_name"
RULE_DEFAULT_VALUE = "value_when_applies"
CONDITION_KEY = "key"
CONDITION_VALUE = "value"
CONDITION_ACTION = "action"


class ACTION(str, Enum):
EQUALS = "EQUALS"
STARTSWITH = "STARTSWITH"
ENDSWITH = "ENDSWITH"
CONTAINS = "CONTAINS"


class SchemaValidator:
def __init__(self, logger: object):
self._logger = logger

def _raise_conf_exc(self, error_str: str) -> None:
self._logger.error(error_str)
raise ConfigurationException(error_str)

def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None:
if not condition or not isinstance(condition, dict):
self._raise_conf_exc(f"invalid condition type, not a dictionary, rule_name={rule_name}")
action = condition.get(CONDITION_ACTION, "")
if action not in [ACTION.EQUALS.value, ACTION.STARTSWITH.value, ACTION.ENDSWITH.value, ACTION.CONTAINS.value]:
self._raise_conf_exc(f"invalid action value, rule_name={rule_name}, action={action}")
key = condition.get(CONDITION_KEY, "")
if not key or not isinstance(key, str):
self._raise_conf_exc(f"invalid key value, key has to be a non empty string, rule_name={rule_name}")
value = condition.get(CONDITION_VALUE, "")
if not value:
self._raise_conf_exc(f"missing condition value, rule_name={rule_name}")

def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None:
if not rule or not isinstance(rule, dict):
self._raise_conf_exc(f"feature rule is not a dictionary, feature_name={feature_name}")
rule_name = rule.get(RULE_NAME_KEY)
if not rule_name or rule_name is None or not isinstance(rule_name, str):
self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}")
rule_default_value = rule.get(RULE_DEFAULT_VALUE)
if rule_default_value is None or not isinstance(rule_default_value, bool):
self._raise_conf_exc(f"invalid rule_default_value, rule_name={rule_name}")
conditions = rule.get(CONDITIONS_KEY, {})
if not conditions or not isinstance(conditions, list):
self._raise_conf_exc(f"invalid condition, rule_name={rule_name}")
# validate conditions
for condition in conditions:
self._validate_condition(rule_name, condition)

def _validate_feature(self, feature_name: str, feature_dict_def: Dict[str, Any]) -> None:
if not feature_dict_def or not isinstance(feature_dict_def, dict):
self._raise_conf_exc(f"invalid AWS AppConfig JSON schema detected, feature {feature_name} is invalid")
feature_default_value = feature_dict_def.get(FEATURE_DEFAULT_VAL_KEY)
if feature_default_value is None or not isinstance(feature_default_value, bool):
self._raise_conf_exc(f"missing feature_default_value for feature, feature_name={feature_name}")
# validate rules
rules = feature_dict_def.get(RULES_KEY, [])
if not rules:
return
if not isinstance(rules, list):
self._raise_conf_exc(f"feature rules is not a list, feature_name={feature_name}")
for rule in rules:
self._validate_rule(feature_name, rule)

def validate_json_schema(self, schema: Dict[str, Any]) -> None:
if not isinstance(schema, dict):
self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
features_dict: Dict = schema.get(FEATURES_KEY)
if not isinstance(features_dict, dict):
self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
for feature_name, feature_dict_def in features_dict.items():
self._validate_feature(feature_name, feature_dict_def)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from abc import ABC, abstractclassmethod
from typing import Any, Dict


class SchemaFetcher(ABC):
def __init__(self, configuration_name: str, cache_seconds: int):
self.configuration_name = configuration_name
self._cache_seconds = cache_seconds

@abstractclassmethod
def get_json_configuration(self) -> Dict[str, Any]:
"""Get configuration string from any configuration storing service and return the parsed JSON dictionary

Raises:
ConfigurationException: Any error that can occur during schema fetch or JSON parse

Returns:
Dict[str, Any]: parsed JSON dictionary
"""
return None
2 changes: 1 addition & 1 deletion aws_lambda_powertools/utilities/parameters/appconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_app_config(
>>> print(value)
My configuration value

**Retrieves a confiugration value and decodes it using a JSON decoder**
**Retrieves a configuration value and decodes it using a JSON decoder**

>>> from aws_lambda_powertools.utilities.parameters import get_parameter
>>>
Expand Down
Empty file.
Loading