diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py index 875665de815e..ee9294dd1781 100644 --- a/cloudinit/config/schema.py +++ b/cloudinit/config/schema.py @@ -5,6 +5,7 @@ import logging import os import re +import shutil import sys import textwrap from collections import defaultdict @@ -34,7 +35,8 @@ from cloudinit.handlers import INCLUSION_TYPES_MAP, type_from_starts_with from cloudinit.helpers import Paths from cloudinit.sources import DataSourceNotFoundException -from cloudinit.util import error, get_modules_from_dir, load_file +from cloudinit.temp_utils import mkdtemp +from cloudinit.util import error, get_modules_from_dir, load_file, write_file try: from jsonschema import ValidationError as _ValidationError @@ -574,6 +576,112 @@ def validate_cloudconfig_metaschema(validator, schema: dict, throw=True): ) +def network_schema_version(network_config: dict) -> Optional[int]: + """Return the version of the network schema when present.""" + if "network" in network_config: + return network_config["network"].get("version") + return network_config.get("version") + + +def netplan_validate_network_schema( + network_config: dict, + strict: bool = False, + annotate: bool = False, + log_details: bool = True, +) -> bool: + """On systems with python3-netplan, validate network_config schema for file + + Leverage NetplanParser for error annotation line, column and detailed + errors. + + @param network_config: Dict of network configuration settings validated + against + @param strict: Boolean, when True raise SchemaValidationErrors instead of + logging warnings. + @param annotate: Boolean, when True, print original network_config_file + content with error annotations + @param log_details: Boolean, when True logs details of validation errors. + If there are concerns about logging sensitive userdata, this should + be set to False. + + @return: True when schema validation was performed. False when not on a + system with netplan and netplan python support. + @raises: SchemaValidationError when netplan's parser raises + NetplanParserExceptions. + """ + if network_schema_version(network_config) != 2: + return False # Netplan only validates network version 2 config + + try: + from netplan import NetplanParserException, Parser # type: ignore + except ImportError: + LOG.debug( + "Skipping netplan schema validation. No python3-netplan available" + ) + return False + + # netplan Parser looks at all *.yaml files in the target directory underA + # /etc/netplan. cloud-init should only validate schema of the + # network-config it generates, so create a /etc/netplan + # to validate only our network-config. + parse_dir = mkdtemp() + netplan_file = os.path.join(parse_dir, "etc/netplan/network-config.yaml") + + # Datasource network config can optionally exclude top-level network key + net_cfg = deepcopy(network_config) + if "network" not in net_cfg: + net_cfg = {"network": net_cfg} + + src_content = safeyaml.dumps(net_cfg) + write_file(netplan_file, src_content, mode=0o600) + + parser = Parser() + errors = [] + try: + # Parse all netplan *.yaml files.load_yaml_heirarchy looks for nested + # etc/netplan subdir under "/". + parser.load_yaml_hierarchy(parse_dir) + except NetplanParserException as e: + errors.append( + SchemaProblem( + "format-l{line}.c{col}".format(line=e.line, col=e.column), + f"Invalid netplan schema. {e.message}", + ) + ) + if os.path.exists(parse_dir): + shutil.rmtree(parse_dir) + if errors: + if strict: + if annotate: + # Load YAML marks for annotation + _, marks = safeyaml.load_with_marks(src_content) + print( + annotated_cloudconfig_file( + net_cfg, + src_content, + marks, + schema_errors=errors, + ) + ) + raise SchemaValidationError(errors) + if log_details: + message = _format_schema_problems( + errors, + prefix=( + f"Invalid {SchemaType.NETWORK_CONFIG.value} provided:\n" + ), + separator="\n", + ) + else: + message = ( + f"Invalid {SchemaType.NETWORK_CONFIG.value} provided: " + "Please run 'sudo cloud-init schema --system' to " + "see the schema errors." + ) + LOG.warning(message) + return True + + def validate_cloudconfig_schema( config: dict, schema: Optional[dict] = None, @@ -582,7 +690,7 @@ def validate_cloudconfig_schema( strict_metaschema: bool = False, log_details: bool = True, log_deprecations: bool = False, -): +) -> bool: """Validate provided config meets the schema definition. @param config: Dict of cloud configuration settings validated against @@ -608,6 +716,12 @@ def validate_cloudconfig_schema( @raises: ValueError on invalid schema_type not in CLOUD_CONFIG or NETWORK_CONFIG """ + if schema_type == SchemaType.NETWORK_CONFIG: + if netplan_validate_network_schema( + network_config=config, strict=strict, log_details=log_details + ): + # Schema was validated by python3-netplan + return True if schema is None: schema = get_schema(schema_type) try: @@ -618,7 +732,7 @@ def validate_cloudconfig_schema( ) except ImportError: LOG.debug("Ignoring schema validation. jsonschema is not present") - return + return False validator = cloudinitValidator(schema, format_checker=FormatChecker()) @@ -997,21 +1111,25 @@ def validate_cloudconfig_file( f"{schema_type.value} {config_path} is not a YAML dict." ) if schema_type == SchemaType.NETWORK_CONFIG: - # Pop optional top-level "network" key when present - netcfg = cloudconfig.get("network", cloudconfig) - if not netcfg: + if not cloudconfig.get("network", cloudconfig): print("Skipping network-config schema validation on empty config.") return False - elif netcfg.get("version") != 1: + if netplan_validate_network_schema( + network_config=cloudconfig, strict=True, annotate=annotate + ): + return True # schema validation performed by python3-netplan + if network_schema_version(cloudconfig) != 1: + # Validation validation requires JSON schema definition in + # cloudinit/config/schemas/schema-network-config-v1.json print( "Skipping network-config schema validation." " No network schema for version:" - f" {netcfg.get('version')}" + f" {network_schema_version(cloudconfig)}" ) return False try: if not validate_cloudconfig_schema( - cloudconfig, schema, strict=True, log_deprecations=False + cloudconfig, schema=schema, strict=True, log_deprecations=False ): print( f"Skipping {schema_type.value} schema validation." diff --git a/cloudinit/stages.py b/cloudinit/stages.py index 18ecc01bd9f7..ee285901793e 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -1011,15 +1011,14 @@ def should_run_on_boot_event(): netcfg, src = self._find_networking_config() self._write_network_config_json(netcfg) - if netcfg and netcfg.get("version") == 1: + if netcfg: validate_cloudconfig_schema( config=netcfg, schema_type=SchemaType.NETWORK_CONFIG, - strict=False, - log_details=True, + strict=False, # Warnings not raising exceptions + log_details=False, # May have wifi passwords in net cfg log_deprecations=True, ) - # ensure all physical devices in config are present self.distro.networking.wait_for_physdevs(netcfg) diff --git a/tests/unittests/config/test_schema.py b/tests/unittests/config/test_schema.py index c52177480079..dfec7b657ac0 100644 --- a/tests/unittests/config/test_schema.py +++ b/tests/unittests/config/test_schema.py @@ -33,6 +33,7 @@ handle_schema_args, load_doc, main, + netplan_validate_network_schema, validate_cloudconfig_file, validate_cloudconfig_metaschema, validate_cloudconfig_schema, @@ -330,11 +331,97 @@ def test_schema_validation_error_expects_schema_errors(self): self.assertTrue(isinstance(exception, ValueError)) +class FakeNetplanParserException(Exception): + def __init__(self, filename, line, column, message): + self.filename = filename + self.line = line + self.column = column + self.message = message + + +class TestNetplanValidateNetworkSchema: + """Tests for netplan_validate_network_schema. + + Heavily mocked because github.com/canonical/netplan project does not + have a pyproject.toml or setup.py or pypi release that allows us to + define tox unittest dependencies. + """ + + @pytest.mark.parametrize( + "config,expected_log", + ( + ({}, ""), + ({"version": 1}, ""), + ( + {"version": 2}, + "Skipping netplan schema validation." + " No python3-netplan available", + ), + ( + {"network": {"version": 2}}, + "Skipping netplan schema validation." + " No python3-netplan available", + ), + ), + ) + def test_network_config_schema_validation_false_when_skipped( + self, config, expected_log, caplog + ): + """netplan_validate_network_schema returns false when skipped.""" + with mock.patch.dict("sys.modules"): + sys.modules.pop("netplan", None) + assert False is netplan_validate_network_schema(config) + assert expected_log in caplog.text + + @pytest.mark.parametrize( + "error,error_log", + ( + (None, ""), + ( + FakeNetplanParserException( + "net.yaml", + line=1, + column=12, + message="incorrect YAML value: yes for dhcp value", + ), + "Invalid network-config provided:\nformat-l1.c12: Invalid" + " netplan schema. incorrect YAML value: yes for dhcp value", + ), + ), + ) + def test_network_config_schema_validation( + self, error, error_log, caplog, tmpdir + ): + + fake_tmpdir = tmpdir.join("mkdtmp") + + class FakeParser: + def load_yaml_hierarchy(self, parse_dir): + # Since we mocked mkdtemp to tmpdir, assert we pass tmpdir + assert parse_dir == fake_tmpdir + if error: + raise error + + # Mock expected imports + with mock.patch.dict( + "sys.modules", + netplan=mock.MagicMock( + NetplanParserException=FakeNetplanParserException, + Parser=FakeParser, + ), + ): + with mock.patch( + "cloudinit.config.schema.mkdtemp", + return_value=fake_tmpdir.strpath, + ): + assert netplan_validate_network_schema({"version": 2}) + if error_log: + assert error_log in caplog.text + + class TestValidateCloudConfigSchema: """Tests for validate_cloudconfig_schema.""" - with_logs = True - @pytest.mark.parametrize( "schema, call_count", ((None, 1), ({"properties": {"p1": {"type": "string"}}}, 0)), @@ -356,7 +443,7 @@ def test_validateconfig_schema_use_full_schema_when_no_schema_param( def test_validateconfig_schema_non_strict_emits_warnings(self, caplog): """When strict is False validate_cloudconfig_schema emits warnings.""" schema = {"properties": {"p1": {"type": "string"}}} - validate_cloudconfig_schema({"p1": -1}, schema, strict=False) + validate_cloudconfig_schema({"p1": -1}, schema=schema, strict=False) [(module, log_level, log_msg)] = caplog.record_tuples assert "cloudinit.config.schema" == module assert logging.WARNING == log_level @@ -374,7 +461,7 @@ def test_validateconfig_schema_sensitive(self, caplog): } validate_cloudconfig_schema( {"hashed-password": "secret"}, - schema, + schema=schema, strict=False, log_details=False, ) @@ -403,7 +490,7 @@ def test_validateconfig_schema_strict_raises_errors(self): """When strict is True validate_cloudconfig_schema raises errors.""" schema = {"properties": {"p1": {"type": "string"}}} with pytest.raises(SchemaValidationError) as context_mgr: - validate_cloudconfig_schema({"p1": -1}, schema, strict=True) + validate_cloudconfig_schema({"p1": -1}, schema=schema, strict=True) assert ( "Cloud config schema errors: p1: -1 is not of type 'string'" == (str(context_mgr.value)) @@ -414,7 +501,9 @@ def test_validateconfig_schema_honors_formats(self): """With strict True, validate_cloudconfig_schema errors on format.""" schema = {"properties": {"p1": {"type": "string", "format": "email"}}} with pytest.raises(SchemaValidationError) as context_mgr: - validate_cloudconfig_schema({"p1": "-1"}, schema, strict=True) + validate_cloudconfig_schema( + {"p1": "-1"}, schema=schema, strict=True + ) assert "Cloud config schema errors: p1: '-1' is not a 'email'" == ( str(context_mgr.value) ) @@ -425,7 +514,10 @@ def test_validateconfig_schema_honors_formats_strict_metaschema(self): schema = {"properties": {"p1": {"type": "string", "format": "email"}}} with pytest.raises(SchemaValidationError) as context_mgr: validate_cloudconfig_schema( - {"p1": "-1"}, schema, strict=True, strict_metaschema=True + {"p1": "-1"}, + schema=schema, + strict=True, + strict_metaschema=True, ) assert "Cloud config schema errors: p1: '-1' is not a 'email'" == str( context_mgr.value @@ -442,7 +534,7 @@ def test_validateconfig_strict_metaschema_do_not_raise_exception( """ schema = {"properties": {"p1": {"types": "string", "format": "email"}}} validate_cloudconfig_schema( - {"p1": "-1"}, schema, strict_metaschema=True + {"p1": "-1"}, schema=schema, strict_metaschema=True ) assert ( "Meta-schema validation failed, attempting to validate config" @@ -657,7 +749,7 @@ def test_validateconfig_logs_deprecations( ): validate_cloudconfig_schema( config, - schema, + schema=schema, strict_metaschema=True, log_deprecations=log_deprecations, ) @@ -714,7 +806,7 @@ def test_validateconfig_schema_of_example(self, schema_id, example): ] ) schema["properties"].update(supplemental_props) - validate_cloudconfig_schema(config_load, schema, strict=True) + validate_cloudconfig_schema(config_load, schema=schema, strict=True) @pytest.mark.usefixtures("fake_filesystem") @@ -1794,7 +1886,10 @@ def test_main_prints_docs(self, _read_cfg_paths, capsys): ("cloud-config", b"#cloud-config\nntp:", "Valid schema"), ( "network-config", - b"network: {'version': 2, 'ethernets': {'eth0': {'dhcp': true}}}", + ( + b"network: {'version': 2, 'ethernets':" + b" {'eth0': {'dhcp': true}}}" + ), "Skipping network-config schema validation. No network schema" " for version: 2", ), @@ -1817,7 +1912,11 @@ def test_main_validates_config_file( myargs += ["--schema-type", schema_type] myyaml.write(content) # shortest ntp schema with mock.patch("sys.argv", myargs): - assert 0 == main(), "Expected 0 exit code" + # Always assert we have no netplan module which triggers + # schema skip of network-config version: 2 until cloud-init + # grows internal schema-network-config-v2.json. + with mock.patch.dict("sys.modules", netplan=ImportError()): + assert 0 == main(), "Expected 0 exit code" out, _err = capsys.readouterr() assert expected in out @@ -1943,7 +2042,7 @@ def test_main_processed_data_preference_over_raw_data( id="netv2_validation_is_skipped", ), pytest.param( - "network:\n", + "network: {}\n", "Skipping network-config schema validation on empty config.", does_not_raise(), id="empty_net_validation_is_skipped", @@ -1992,8 +2091,12 @@ def test_main_validates_system_userdata_vendordata_and_network_config( write_file(network_file, net_config) myargs = ["mycmd", "--system"] with error_raised: - with mock.patch("sys.argv", myargs): - main() + # Always assert we have no netplan module which triggers + # schema skip of network-config version: 2 until cloud-init + # grows internal schema-network-config-v2.json. + with mock.patch.dict("sys.modules", netplan=ImportError()): + with mock.patch("sys.argv", myargs): + main() out, _err = capsys.readouterr() net_output = net_output.format(network_file=network_file) diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index b47f3a2ef483..783e70dbd527 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -45,7 +45,6 @@ except ImportError: HAS_APT_PKG = False - # Makes the old path start # with new base instead of whatever # it previously had