Skip to content

Commit

Permalink
Create relationship rule and validate ImageId on Instance (#3513)
Browse files Browse the repository at this point in the history
* Create rule E3673 to validate ImageId being required on an instance
* Create rule E3049 to validate ECS Task/Service and LB target configuration with dynamic host ports
  • Loading branch information
kddejong authored Jul 26, 2024
1 parent fbdd7fa commit e46a0d2
Show file tree
Hide file tree
Showing 18 changed files with 1,791 additions and 39 deletions.
51 changes: 25 additions & 26 deletions src/cfnlint/conditions/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,22 @@ def build_scenarios(
# formatting or just the wrong condition name
return

def _build_cfn_implies(self, scenarios) -> And:
conditions = []
for condition_name, opt in scenarios.items():
if opt:
conditions.append(
self._conditions[condition_name].build_true_cnf(self._solver_params)
)
else:
conditions.append(
self._conditions[condition_name].build_false_cnf(
self._solver_params
)
)

return And(*conditions)

def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool:
"""Based on a bunch of scenario conditions and their Truth/False value
determine if implies condition is True any time the scenarios are satisfied
Expand All @@ -260,36 +276,18 @@ def check_implies(self, scenarios: dict[str, bool], implies: str) -> bool:
if not scenarios.get(implies, True):
return False

conditions = []
for condition_name, opt in scenarios.items():
if opt:
conditions.append(
self._conditions[condition_name].build_true_cnf(
self._solver_params
)
)
else:
conditions.append(
self._conditions[condition_name].build_false_cnf(
self._solver_params
)
)

and_condition = self._build_cfn_implies(scenarios)
cnf.add_prop(and_condition)
implies_condition = self._conditions[implies].build_true_cnf(
self._solver_params
)
cnf.add_prop(Not(Implies(and_condition, implies_condition)))

and_condition = And(*conditions)
cnf.add_prop(and_condition)

# if the implies condition has to be true already then we don't
# need to imply it
if not scenarios.get(implies):
cnf.add_prop(Not(Implies(and_condition, implies_condition)))
if satisfiable(cnf):
return True
results = satisfiable(cnf)
if results:
return False

return False
return True
except KeyError:
# KeyError is because the listed condition doesn't exist because of bad
# formatting or just the wrong condition name
Expand Down Expand Up @@ -354,7 +352,8 @@ def satisfiable(
determine if the conditions are satisfied
Args:
condition_names (list[str]): A list of condition names
condition_names (dict[str, bool]): A list of condition names with if
they are True or False
Returns:
bool: True if the conditions are satisfied
Expand Down
6 changes: 6 additions & 0 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ class Resource(_Ref):
"""

type: str = field(init=False)
condition: str | None = field(init=False, default=None)
resource: InitVar[Any]

def __post_init__(self, resource) -> None:
Expand All @@ -369,6 +370,11 @@ def __post_init__(self, resource) -> None:
if self.type.startswith("Custom::"):
self.type = "AWS::CloudFormation::CustomResource"

c = resource.get("Condition")
if not isinstance(t, str):
raise ValueError("Condition must be a string")
self.condition = c

@property
def get_atts(self, region: str = "us-east-1") -> AttributeDict:
return PROVIDER_SCHEMA_MANAGER.get_type_getatts(self.type, region)
Expand Down
10 changes: 4 additions & 6 deletions src/cfnlint/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

from cfnlint.rules._rule import CloudFormationLintRule, Match, RuleMatch
from cfnlint.rules._rules import Rules, RulesCollection
from cfnlint.rules.jsonschema import (
CfnLintJsonSchema,
CfnLintJsonSchemaRegional,
CfnLintKeyword,
SchemaDetails,
)
from cfnlint.rules.jsonschema import CfnLintJsonSchema # type: ignore
from cfnlint.rules.jsonschema import CfnLintJsonSchemaRegional # type: ignore
from cfnlint.rules.jsonschema import CfnLintKeyword # type: ignore
from cfnlint.rules.jsonschema import SchemaDetails
12 changes: 12 additions & 0 deletions src/cfnlint/rules/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from cfnlint.rules.helpers.get_resource_by_name import get_resource_by_name
from cfnlint.rules.helpers.get_value_from_path import get_value_from_path

__all__ = [
"get_value_from_path",
"get_resource_by_name",
]
50 changes: 50 additions & 0 deletions src/cfnlint/rules/helpers/get_resource_by_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Sequence

from cfnlint.context import Path
from cfnlint.context.conditions import Unsatisfiable
from cfnlint.jsonschema import Validator


def get_resource_by_name(
validator: Validator, name: str, types: Sequence[str] | None = None
) -> tuple[Any, Validator]:

resource = validator.context.resources.get(name)
if not resource:
return None, validator

if types and resource.type not in types:
return None, validator

if resource.condition:
try:
validator = validator.evolve(
context=validator.context.evolve(
conditions=validator.context.conditions.evolve(
{
resource.condition: True,
}
),
)
)
except Unsatisfiable:
return None, validator

validator = validator.evolve(
context=validator.context.evolve(
path=Path(
path=deque(["Resources", name]),
cfn_path=deque(["Resources", resource.type]),
)
)
)

return validator.cfn.template.get("Resources", {}).get(name), validator
119 changes: 119 additions & 0 deletions src/cfnlint/rules/helpers/get_value_from_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.context.conditions import Unsatisfiable
from cfnlint.helpers import is_function
from cfnlint.jsonschema import Validator


def _get_relationship_fn_if(
validator: Validator, key: Any, value: Any, path: deque[str | int]
) -> Iterator[tuple[Any, Validator]]:
if not isinstance(value, list) or len(value) != 3:
return
condition = value[0]

for i in [1, 2]:
try:
if_validator = validator.evolve(
context=validator.context.evolve(
conditions=validator.context.conditions.evolve(
status={
condition: True if i == 1 else False,
},
),
path=validator.context.path.descend(path=key).descend(path=i),
)
)
for r, v in get_value_from_path(
if_validator,
value[i],
path.copy(),
):
yield r, v
except Unsatisfiable:
pass


def _get_value_from_path_list(
validator: Validator, instance: Any, path: deque[str | int]
) -> Iterator[tuple[Any, Validator]]:
for i, v in enumerate(instance):
for r, v in get_value_from_path(
validator.evolve(
context=validator.context.evolve(
path=validator.context.path.descend(path=i)
),
),
v,
path.copy(),
):
yield r, v


def get_value_from_path(
validator: Validator, instance: Any, path: deque[str | int]
) -> Iterator[tuple[Any, Validator]]:
"""
Retrieve a value from a nested dictionary or list using a path.
Args:
validator (Validator): The validator instance
data (Any): The dictionary or list to search.
path (deque[str | int]): The path to the value.
Returns:
The value at the specified path, or None if the key doesn't exist.
Examples:
>>> data = {'a': {'b': {'c': 3}}}
>>> get_value_from_path(data, ['a', 'b', 'c'])
3
"""

fn_k, fn_v = is_function(instance)
if fn_k is not None:
if fn_k == "Fn::If":
yield from _get_relationship_fn_if(validator, fn_k, fn_v, path)
elif fn_k == "Ref" and fn_v == "AWS::NoValue":
yield None, validator.evolve(
context=validator.context.evolve(
path=validator.context.path.descend(path=fn_k)
)
)
elif not path:
yield instance, validator
return

if not path:
yield instance, validator
return

key = path.popleft()
if isinstance(instance, list) and key == "*":
yield from _get_value_from_path_list(validator, instance, path)
return

if not isinstance(instance, dict):
yield None, validator
return

for r, v in get_value_from_path(
validator.evolve(
context=validator.context.evolve(
path=validator.context.path.descend(path=key)
)
),
instance.get(key),
path.copy(),
):
yield r, v

return
4 changes: 2 additions & 2 deletions src/cfnlint/rules/jsonschema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@

__all__ = [
"BaseJsonSchema",
"CfnLintKeyword",
"CfnLintJsonSchema",
"SchemaDetails",
"CfnLintJsonSchemaRegional",
"CfnLintKeyword",
"MaxProperties",
"PropertyNames",
"SchemaDetails",
]
Loading

0 comments on commit e46a0d2

Please sign in to comment.