Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/confcom/azext_confcom/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
ACI_FIELD_YAML_READINESS_PROBE = "readinessProbe"
ACI_FIELD_YAML_STARTUP_PROBE = "startupProbe"
VIRTUAL_NODE_YAML_METADATA = "metadata"
VIRTUAL_NODE_YAML_COMMAND = "command"
VIRTUAL_NODE_YAML_NAME = "name"
VIRTUAL_NODE_YAML_ANNOTATIONS = "annotations"
VIRTUAL_NODE_YAML_LABELS = "labels"
Expand All @@ -103,6 +104,7 @@
VIRTUAL_NODE_YAML_RESOURCES_HUGEPAGES = "hugepages"
VIRTUAL_NODE_YAML_RESOURCES_EPHEMERAL_STORAGE = "ephemeral-storage"
VIRTUAL_NODE_YAML_SPECIAL_ENV_VAR_REGEX = "^===VIRTUALNODE2.CC.THIM.(.+)===$"
VIRTUAL_NODE_YAML_READ_ONLY_MANY = "ReadOnlyMany"

# output json values
POLICY_FIELD_CONTAINERS = "containers"
Expand Down
50 changes: 35 additions & 15 deletions src/confcom/azext_confcom/security_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
get_diff_size,
process_env_vars_from_yaml,
convert_to_pod_spec,
get_volume_claim_templates,
filter_non_pod_resources,
decompose_confidential_properties,
process_env_vars_from_config,
Expand Down Expand Up @@ -1051,7 +1052,7 @@ def load_policy_from_virtual_node_yaml_str(
yaml_contents = filter_non_pod_resources(yaml_contents)
for yaml in yaml_contents:
# extract existing policy and fragments for diff mode
metadata = case_insensitive_dict_get(yaml, "metadata")
metadata = case_insensitive_dict_get(yaml, config.VIRTUAL_NODE_YAML_METADATA)
annotations = case_insensitive_dict_get(metadata, config.VIRTUAL_NODE_YAML_ANNOTATIONS)
labels = case_insensitive_dict_get(metadata, config.VIRTUAL_NODE_YAML_LABELS) or []
use_workload_identity = (
Expand All @@ -1070,6 +1071,7 @@ def load_policy_from_virtual_node_yaml_str(
existing_containers, existing_fragments = ([], [])
# because there are many ways to get pod information, we normalize them so the interface is the same
normalized_yaml = convert_to_pod_spec(yaml)
volume_claim_templates = get_volume_claim_templates(yaml)

spec = case_insensitive_dict_get(normalized_yaml, "spec")
if not spec:
Expand All @@ -1079,17 +1081,17 @@ def load_policy_from_virtual_node_yaml_str(
pod_security_context = case_insensitive_dict_get(spec, "securityContext") or {}

policy_containers = []
containers = case_insensitive_dict_get(spec, "containers")
containers = case_insensitive_dict_get(spec, config.ACI_FIELD_TEMPLATE_CONTAINERS)
if not containers:
eprint("YAML file does not contain a containers field")
# NOTE: initContainers are not treated differently in the security policy
# but they are treated differently in the pod spec
# e.g. lifecycle and probes are not supported in initContainers
init_containers = case_insensitive_dict_get(spec, "initContainers") or []
init_containers = case_insensitive_dict_get(spec, config.ACI_FIELD_TEMPLATE_INIT_CONTAINERS) or []

for container in containers + init_containers:
# image and name
image = case_insensitive_dict_get(container, "image")
image = case_insensitive_dict_get(container, config.ACI_FIELD_TEMPLATE_IMAGE)
if not image:
eprint("Container does not have an image field")

Expand All @@ -1104,45 +1106,63 @@ def load_policy_from_virtual_node_yaml_str(
envs += config.VIRTUAL_NODE_ENV_RULES_WORKLOAD_IDENTITY

# command
command = case_insensitive_dict_get(container, "command") or []
command = case_insensitive_dict_get(container, config.VIRTUAL_NODE_YAML_COMMAND) or []
args = case_insensitive_dict_get(container, "args") or []

# mounts
mounts = copy.deepcopy(config.DEFAULT_MOUNTS_VIRTUAL_NODE)
volumes = case_insensitive_dict_get(spec, "volumes") or []

# there can be implicit volumes from volumeClaimTemplates
# We need to add them to the list of volumes and note if they are readonly
for volume_claim_template in volume_claim_templates:
vct_metadata = case_insensitive_dict_get(volume_claim_template, config.VIRTUAL_NODE_YAML_METADATA)
temp_volume = {
config.VIRTUAL_NODE_YAML_NAME:
case_insensitive_dict_get(vct_metadata, config.VIRTUAL_NODE_YAML_NAME),
}
vct_spec = case_insensitive_dict_get(volume_claim_template, "spec")
if vct_spec:
vct_access_modes = case_insensitive_dict_get(vct_spec, "accessModes")
Copy link

Copilot AI Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a null check for vct_access_modes before checking its contents to avoid potential errors.

Suggested change
vct_access_modes = case_insensitive_dict_get(vct_spec, "accessModes")
if vct_access_modes and config.VIRTUAL_NODE_YAML_READ_ONLY_MANY in vct_access_modes:

Copilot uses AI. Check for mistakes.
if vct_access_modes and config.VIRTUAL_NODE_YAML_READ_ONLY_MANY in vct_access_modes:
temp_volume[config.ACI_FIELD_TEMPLATE_MOUNTS_READONLY] = True

volumes.append(temp_volume)

# set of volume types that are read-only by default
read_only_types = {"configMap", "secret", "downwardAPI", "projected"}

volume_mounts = case_insensitive_dict_get(container, "volumeMounts")
if volume_mounts:
for mount in volume_mounts:
mount_name = case_insensitive_dict_get(mount, "name")
mount_name = case_insensitive_dict_get(mount, config.VIRTUAL_NODE_YAML_NAME)
mount_path = case_insensitive_dict_get(mount, "mountPath")

# find the corresponding volume
volume = next(
(vol for vol in volumes if case_insensitive_dict_get(vol, "name") == mount_name),
(vol for vol in volumes if case_insensitive_dict_get(
vol, config.VIRTUAL_NODE_YAML_NAME) == mount_name
),
None
)
) or {}

# determine if this volume is one of the read-only types
read_only_default = any(key in read_only_types for key in volume.keys())
read_only_default = any(key in read_only_types for key in volume.keys()) or volume.get(config.ACI_FIELD_TEMPLATE_MOUNTS_READONLY)

if read_only_default:
# log warning if readOnly is explicitly set to false for a read-only volume type
if case_insensitive_dict_get(mount, "readOnly") is False:
if case_insensitive_dict_get(mount, config.ACI_FIELD_TEMPLATE_MOUNTS_READONLY) is False:
logger.warning(
"Volume '%s' in container '%s' is of a type that requires readOnly access (%s), "
"but readOnly: false was specified. Enforcing readOnly: true for policy generation.",
mount_name,
case_insensitive_dict_get(container, "name"),
case_insensitive_dict_get(container, config.VIRTUAL_NODE_YAML_NAME),
', '.join(read_only_types)
)
mount_readonly = True
else:
# use the readOnly field or default to False for non-read-only volumes
mount_readonly = case_insensitive_dict_get(mount, "readOnly") or False
mount_readonly = case_insensitive_dict_get(mount, config.ACI_FIELD_TEMPLATE_MOUNTS_READONLY) or False

mounts.append({
config.ACI_FIELD_CONTAINERS_MOUNTS_TYPE: config.ACI_FIELD_YAML_MOUNT_TYPE,
Expand All @@ -1151,9 +1171,9 @@ def load_policy_from_virtual_node_yaml_str(
})

# container security context
container_security_context = case_insensitive_dict_get(container, "securityContext") or {}
container_security_context = case_insensitive_dict_get(container, config.ACI_FIELD_TEMPLATE_SECURITY_CONTEXT) or {}

if case_insensitive_dict_get(container_security_context, "privileged") is True:
if case_insensitive_dict_get(container_security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED) is True:
mounts += config.DEFAULT_MOUNTS_PRIVILEGED_VIRTUAL_NODE

# security context
Expand All @@ -1173,7 +1193,7 @@ def load_policy_from_virtual_node_yaml_str(
policy_containers.append(
{
config.ACI_FIELD_CONTAINERS_ID: image,
config.ACI_FIELD_CONTAINERS_NAME: case_insensitive_dict_get(container, "name") or image,
config.ACI_FIELD_CONTAINERS_NAME: case_insensitive_dict_get(container, config.VIRTUAL_NODE_YAML_NAME) or image,
config.ACI_FIELD_CONTAINERS_CONTAINERIMAGE: image,
config.ACI_FIELD_CONTAINERS_ENVS: envs,
config.ACI_FIELD_CONTAINERS_COMMAND: command + args,
Expand Down
9 changes: 9 additions & 0 deletions src/confcom/azext_confcom/template_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,15 @@ def convert_to_pod_spec_helper(pod_dict):
return {}


def get_volume_claim_templates(pod_spec: dict) -> List[dict]:
volume_claim_templates = []
if "spec" in pod_spec:
spec = pod_spec["spec"]
if "volumeClaimTemplates" in spec:
return spec["volumeClaimTemplates"]
return volume_claim_templates


def filter_non_pod_resources(resources: List[dict]) -> List[dict]:
"""
Filter out non-pod spawning resources from a list of resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,41 @@ class PolicyGeneratingVirtualNode(unittest.TestCase):
- python3
"""

custom_yaml_volume_claim = """
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: web
spec:
serviceName: "nginx"
replicas: 2
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: mcr.microsoft.com/cbl-mariner/distroless/minimal:2.0
ports:
- containerPort: 80
name: web
volumeMounts:
- name: www
mountPath: /usr/share/nginx/html
volumeClaimTemplates:
- metadata:
name: www
spec:
accessModes: [ "ReadOnlyMany" ]
resources:
requests:
storage: 1Gi
"""

def test_compare_policy_sources(self):
custom_policy = load_policy_from_str(self.custom_json)
custom_policy.populate_policy_content_for_all_images()
Expand Down Expand Up @@ -328,3 +363,21 @@ def test_workload_identity(self):

for var in config.VIRTUAL_NODE_ENV_RULES_WORKLOAD_IDENTITY:
self.assertTrue(var['name'] in env_rule_names)

def test_volume_claim(self):
virtual_node_policy = load_policy_from_virtual_node_yaml_str(self.custom_yaml_volume_claim)[0]
virtual_node_policy.populate_policy_content_for_all_images()
container_start = "containers := "
containers = json.loads(extract_containers_from_text(virtual_node_policy.get_serialized_output(OutputType.PRETTY_PRINT), container_start))
# get the volume mount from the first container
mounts = [
mount
for mount in
containers[0][config.POLICY_FIELD_CONTAINERS_ELEMENTS_MOUNTS]
]
self.assertTrue("/usr/share/nginx/html" in [mount[config.POLICY_FIELD_CONTAINERS_ELEMENTS_MOUNTS_DESTINATION] for mount in mounts])
mount = [mount for mount in mounts if mount[config.POLICY_FIELD_CONTAINERS_ELEMENTS_MOUNTS_DESTINATION] == "/usr/share/nginx/html"][0]
self.assertTrue("ro" in mount[config.POLICY_FIELD_CONTAINERS_ELEMENTS_MOUNTS_OPTIONS])

# get the nginx mount and make sure it is readonly
containers[0][config.POLICY_FIELD_CONTAINERS_ELEMENTS_MOUNTS]