Skip to content

Commit

Permalink
feat(schema): use netplan API to validate network-config (canonical#4767
Browse files Browse the repository at this point in the history
)

Validation of network-config version: 2 will be performed on systems
with python3-netplan installed which delivers a python API
netplan.Parser which allows cloud-init to parse and determine specific
netplan schema errors.

The netplan API is only present on Ubuntu Noble in the python3-netplan
deb package.

Add netplan_validate_network_config function to use netplan API when
present. Wire this function into runtime schema validation at initial
boot to warn about network-config invalid version: 2 schema.

Also wire this function into cloud-init schema command to warn and
annotate specific schema errors by line in datasource provided
network-config.
  • Loading branch information
blackboxsw committed Jan 25, 2024
1 parent a3373ca commit 2586ed2
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 28 deletions.
134 changes: 125 additions & 9 deletions cloudinit/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import re
import shutil
import sys
import textwrap
from collections import defaultdict
Expand Down Expand Up @@ -34,7 +35,8 @@
from cloudinit.handlers import INCLUSION_TYPES_MAP, type_from_starts_with
from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceNotFoundException
from cloudinit.util import error, get_modules_from_dir, load_file
from cloudinit.temp_utils import mkdtemp
from cloudinit.util import error, get_modules_from_dir, load_file, write_file

try:
from jsonschema import ValidationError as _ValidationError
Expand Down Expand Up @@ -574,6 +576,110 @@ def validate_cloudconfig_metaschema(validator, schema: dict, throw=True):
)


def network_schema_version(network_config: dict) -> Optional[int]:
"""Return the version of the network schema when present."""
if "network" in network_config:
return network_config["network"].get("version")
return network_config.get("version")


def netplan_validate_network_schema(
network_config: dict,
strict: bool = False,
annotate: bool = False,
log_details: bool = True,
) -> bool:
"""On systems with netplan, validate network_config schema for file
Leverage NetplanParser for error annotation line, column and detailed
errors.
@param network_config: Dict of network configuration settings validated
against
@param strict: Boolean, when True raise SchemaValidationErrors instead of
logging warnings.
@param annotate: Boolean, when True, print original network_config_file
content with error annotations
@param log_details: Boolean, when True logs details of validation errors.
If there are concerns about logging sensitive userdata, this should
be set to False.
@return: True when schema validation was performed. False when not on a
system with netplan and netplan python support.
@raises: SchemaValidationError when netplan's parser raises
NetplanParserExceptions.
"""
if network_schema_version(network_config) != 2:
return False # Netplan only validates network version 2 config

try:
from netplan import NetplanParserException, Parser # type: ignore
except ImportError:
LOG.debug("Skipping netplan schema validation. No netplan available")
return False

# netplan Parser looks at all *.yaml files in the target directory underA
# /etc/netplan. cloud-init should only validate schema of the
# network-config it generates, so create a <tmp_dir>/etc/netplan
# to validate only our network-config.
parse_dir = mkdtemp()
netplan_file = os.path.join(parse_dir, "etc/netplan/network-config.yaml")

# Datasource network config can optionally exclude top-level network key
net_cfg = deepcopy(network_config)
if "network" not in net_cfg:
net_cfg = {"network": net_cfg}

src_content = safeyaml.dumps(net_cfg)
write_file(netplan_file, src_content, mode=0o600)

parser = Parser()
errors = []
try:
# Parse all netplan *.yaml files.load_yaml_heirarchy looks for nested
# etc/netplan subdir under "/".
parser.load_yaml_hierarchy(parse_dir)
except NetplanParserException as e:
errors.append(
SchemaProblem(
"format-l{line}.c{col}".format(line=e.line, col=e.column),
f"Invalid netplan schema. {e.message}",
)
)
if os.path.exists(parse_dir):
shutil.rmtree(parse_dir)
if errors:
if strict:
if annotate:
# Load YAML marks for annotation
_, marks = safeyaml.load_with_marks(src_content)
print(
annotated_cloudconfig_file(
net_cfg,
src_content,
marks,
schema_errors=errors,
)
)
raise SchemaValidationError(errors)
if log_details:
message = _format_schema_problems(
errors,
prefix=(
f"Invalid {SchemaType.NETWORK_CONFIG.value} provided:\n"
),
separator="\n",
)
else:
message = (
f"Invalid {SchemaType.NETWORK_CONFIG.value} provided: "
"Please run 'sudo cloud-init schema --system' to "
"see the schema errors."
)
LOG.warning(message)
return True


def validate_cloudconfig_schema(
config: dict,
schema: Optional[dict] = None,
Expand All @@ -582,7 +688,7 @@ def validate_cloudconfig_schema(
strict_metaschema: bool = False,
log_details: bool = True,
log_deprecations: bool = False,
):
) -> bool:
"""Validate provided config meets the schema definition.
@param config: Dict of cloud configuration settings validated against
Expand All @@ -608,6 +714,12 @@ def validate_cloudconfig_schema(
@raises: ValueError on invalid schema_type not in CLOUD_CONFIG or
NETWORK_CONFIG
"""
if schema_type == SchemaType.NETWORK_CONFIG:
if netplan_validate_network_schema(
network_config=config, strict=strict, log_details=log_details
):
# Schema was validated by netplan
return True
if schema is None:
schema = get_schema(schema_type)
try:
Expand All @@ -618,7 +730,7 @@ def validate_cloudconfig_schema(
)
except ImportError:
LOG.debug("Ignoring schema validation. jsonschema is not present")
return
return False

validator = cloudinitValidator(schema, format_checker=FormatChecker())

Expand Down Expand Up @@ -997,21 +1109,25 @@ def validate_cloudconfig_file(
f"{schema_type.value} {config_path} is not a YAML dict."
)
if schema_type == SchemaType.NETWORK_CONFIG:
# Pop optional top-level "network" key when present
netcfg = cloudconfig.get("network", cloudconfig)
if not netcfg:
if not cloudconfig.get("network", cloudconfig):
print("Skipping network-config schema validation on empty config.")
return False
elif netcfg.get("version") != 1:
if netplan_validate_network_schema(
network_config=cloudconfig, strict=True, annotate=annotate
):
return True # schema validation performed by netplan
if network_schema_version(cloudconfig) != 1:
# Validation requires JSON schema definition in
# cloudinit/config/schemas/schema-network-config-v1.json
print(
"Skipping network-config schema validation."
" No network schema for version:"
f" {netcfg.get('version')}"
f" {network_schema_version(cloudconfig)}"
)
return False
try:
if not validate_cloudconfig_schema(
cloudconfig, schema, strict=True, log_deprecations=False
cloudconfig, schema=schema, strict=True, log_deprecations=False
):
print(
f"Skipping {schema_type.value} schema validation."
Expand Down
7 changes: 3 additions & 4 deletions cloudinit/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,15 +1011,14 @@ def should_run_on_boot_event():
netcfg, src = self._find_networking_config()
self._write_network_config_json(netcfg)

if netcfg and netcfg.get("version") == 1:
if netcfg:
validate_cloudconfig_schema(
config=netcfg,
schema_type=SchemaType.NETWORK_CONFIG,
strict=False,
log_details=True,
strict=False, # Warnings not raising exceptions
log_details=False, # May have wifi passwords in net cfg
log_deprecations=True,
)

# ensure all physical devices in config are present
self.distro.networking.wait_for_physdevs(netcfg)

Expand Down
Loading

0 comments on commit 2586ed2

Please sign in to comment.