diff --git a/docs/CONTRIBUTORS.rst b/docs/CONTRIBUTORS.rst index 54ef8a8d40..302c8c205b 100644 --- a/docs/CONTRIBUTORS.rst +++ b/docs/CONTRIBUTORS.rst @@ -113,8 +113,9 @@ CI/CD will check that new TUF code is formatted with `black Auto-formatting can be done on the command line: :: - $ black - $ isort + $ # TODO: configure black and isort args in pyproject.toml (see #1161) + $ black --line-length 80 tuf/api + $ isort --line-length 80 --profile black -p tuf tuf/api or via source code editor plugin [`black `__, diff --git a/pyproject.toml b/pyproject.toml index 339f2416dc..2f21011953 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,90 +1,3 @@ -# Build-system section [build-system] requires = ["setuptools>=40.8.0", "wheel"] build-backend = "setuptools.build_meta" - -# Black section -# Read more here: https://black.readthedocs.io/en/stable/usage_and_configuration/the_basics.html#configuration-via-a-file -[tool.black] -line-length=80 - -# Isort section -# Read more here: https://pycqa.github.io/isort/docs/configuration/config_files.html -[tool.isort] -profile="black" -line_length=80 -known_first_party = ["tuf"] - -# Pylint section - -# Minimal pylint configuration file for Secure Systems Lab Python Style Guide: -# https://github.com/secure-systems-lab/code-style-guidelines -# -# Based on Google Python Style Guide pylintrc and pylint defaults: -# https://google.github.io/styleguide/pylintrc -# http://pylint.pycqa.org/en/latest/technical_reference/features.html - -[tool.pylint.message_control] -# Disable the message, report, category or checker with the given id(s). -# NOTE: To keep this config as short as possible we only disable checks that -# are currently in conflict with our code. If new code displeases the linter -# (for good reasons) consider updating this config file, or disable checks with. -disable=[ - "fixme", - "too-few-public-methods", - "too-many-arguments", - "format", - "duplicate-code" -] - -[tool.pylint.basic] -good-names = ["i","j","k","v","e","f","fn","fp","_type","_"] -# Regexes for allowed names are copied from the Google pylintrc -# NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. -# If there are multiple groups it enfoces the prevalent naming style inside -# each modules. Names in the exempt capturing group are ignored. -function-rgx="^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$" -method-rgx="(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$" -argument-rgx="^[a-z][a-z0-9_]*$" -attr-rgx="^_{0,2}[a-z][a-z0-9_]*$" -class-attribute-rgx="^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$" -class-rgx="^_?[A-Z][a-zA-Z0-9]*$" -const-rgx="^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$" -inlinevar-rgx="^[a-z][a-z0-9_]*$" -module-rgx="^(_?[a-z][a-z0-9_]*|__init__)$" -no-docstring-rgx="(__.*__|main|test.*|.*test|.*Test)$" -variable-rgx="^[a-z][a-z0-9_]*$" -docstring-min-length=10 - -[tool.pylint.logging] -logging-format-style="old" - -[tool.pylint.miscellaneous] -notes="TODO" - -[tool.pylint.STRING] -check-quote-consistency="yes" - -# mypy section -# Read more here: https://mypy.readthedocs.io/en/stable/config_file.html#using-a-pyproject-toml-file -[tool.mypy] -warn_unused_configs = "True" -warn_redundant_casts = "True" -warn_unused_ignores = "True" -warn_unreachable = "True" -strict_equality = "True" -disallow_untyped_defs = "True" -disallow_untyped_calls = "True" -show_error_codes = "True" -files = [ - "tuf/api/", - "tuf/ngclient", - "tuf/exceptions.py" -] - -[[tool.mypy.overrides]] -module = [ - "securesystemslib.*", - "urllib3.*" -] -ignore_missing_imports = "True" diff --git a/setup.cfg b/setup.cfg index 73a975b948..d86316a13e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,3 +52,23 @@ tuf = py.typed ignore = .fossa.yml .readthedocs.yaml + +[mypy] +warn_unused_configs = True +warn_redundant_casts = True +warn_unused_ignores = True +warn_unreachable = True +strict_equality = True +disallow_untyped_defs = True +disallow_untyped_calls = True +show_error_codes = True +files = + tuf/api/, + tuf/ngclient, + tuf/exceptions.py + +[mypy-securesystemslib.*] +ignore_missing_imports = True + +[mypy-urllib3.*] +ignore_missing_imports = True diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index bb86b59cb7..ec1044007c 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -48,7 +48,7 @@ import os import tempfile from collections import OrderedDict -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Dict, Iterator, List, Optional, Tuple from urllib import parse @@ -81,6 +81,14 @@ SPEC_VER = ".".join(SPECIFICATION_VERSION) +@dataclass +class FetchTracker: + """Fetcher counter for metadata and targets.""" + + metadata: List[Tuple[str, Optional[int]]] = field(default_factory=list) + targets: List[Tuple[str, Optional[str]]] = field(default_factory=list) + + @dataclass class RepositoryTarget: """Contains actual target data and the related target metadata.""" @@ -116,6 +124,8 @@ def __init__(self) -> None: self.dump_dir: Optional[str] = None self.dump_version = 0 + self.fetch_tracker = FetchTracker() + now = datetime.utcnow() self.safe_expiry = now.replace(microsecond=0) + timedelta(days=30) @@ -139,7 +149,7 @@ def targets(self) -> Targets: def all_targets(self) -> Iterator[Tuple[str, Targets]]: """Yield role name and signed portion of targets one by one.""" - yield Targets.type, self.md_targets.signed + yield "targets", self.md_targets.signed for role, md in self.md_delegates.items(): yield role, md.signed @@ -181,7 +191,7 @@ def _initialize(self) -> None: def publish_root(self) -> None: """Sign and store a new serialized version of root.""" self.md_root.signatures.clear() - for signer in self.signers[Root.type].values(): + for signer in self.signers["root"].values(): self.md_root.sign(signer, append=True) self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) @@ -197,8 +207,8 @@ def fetch(self, url: str) -> Iterator[bytes]: ver_and_name = path[len("/metadata/") :][: -len(".json")] version_str, _, role = ver_and_name.partition(".") # root is always version-prefixed while timestamp is always NOT - if role == Root.type or ( - self.root.consistent_snapshot and ver_and_name != Timestamp.type + if role == "root" or ( + self.root.consistent_snapshot and ver_and_name != "timestamp" ): version: Optional[int] = int(version_str) else: @@ -229,6 +239,8 @@ def _fetch_target( If hash is None, then consistent_snapshot is not used. """ + self.fetch_tracker.targets.append((target_path, target_hash)) + repo_target = self.target_files.get(target_path) if repo_target is None: raise FetcherHTTPError(f"No target {target_path}", 404) @@ -248,7 +260,9 @@ def _fetch_metadata( If version is None, non-versioned metadata is being requested. """ - if role == Root.type: + self.fetch_tracker.metadata.append((role, version)) + + if role == "root": # return a version previously serialized in publish_root() if version is None or version > len(self.signed_roots): raise FetcherHTTPError(f"Unknown root version {version}", 404) @@ -257,11 +271,11 @@ def _fetch_metadata( # sign and serialize the requested metadata md: Optional[Metadata] - if role == Timestamp.type: + if role == "timestamp": md = self.md_timestamp - elif role == Snapshot.type: + elif role == "snapshot": md = self.md_snapshot - elif role == Targets.type: + elif role == "targets": md = self.md_targets else: md = self.md_delegates.get(role) @@ -297,7 +311,7 @@ def update_timestamp(self) -> None: self.timestamp.snapshot_meta.version = self.snapshot.version if self.compute_metafile_hashes_length: - hashes, length = self._compute_hashes_and_length(Snapshot.type) + hashes, length = self._compute_hashes_and_length("snapshot") self.timestamp.snapshot_meta.hashes = hashes self.timestamp.snapshot_meta.length = length @@ -320,7 +334,7 @@ def update_snapshot(self) -> None: def add_target(self, role: str, data: bytes, path: str) -> None: """Create a target from data and add it to the target_files.""" - if role == Targets.type: + if role == "targets": targets = self.targets else: targets = self.md_delegates[role].signed @@ -339,7 +353,7 @@ def add_delegation( hash_prefixes: Optional[List[str]], ) -> None: """Add delegated target role to the repository.""" - if delegator_name == Targets.type: + if delegator_name == "targets": delegator = self.targets else: delegator = self.md_delegates[delegator_name].signed @@ -375,9 +389,9 @@ def write(self) -> None: for ver in range(1, len(self.signed_roots) + 1): with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f: - f.write(self._fetch_metadata(Root.type, ver)) + f.write(self._fetch_metadata("root", ver)) - for role in [Timestamp.type, Snapshot.type, Targets.type]: + for role in ["timestamp", "snapshot", "targets"]: with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) diff --git a/tests/test_api.py b/tests/test_api.py index 8bd69c9b32..02c6521725 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -78,7 +78,7 @@ def setUpClass(cls) -> None: # Load keys into memory cls.keystore = {} - for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]: + for role in ["delegation", "snapshot", "targets", "timestamp"]: cls.keystore[role] = import_ed25519_privatekey_from_file( os.path.join(cls.keystore_dir, role + "_key"), password="password", @@ -92,10 +92,10 @@ def tearDownClass(cls) -> None: def test_generic_read(self) -> None: for metadata, inner_metadata_cls in [ - (Root.type, Root), - (Snapshot.type, Snapshot), - (Timestamp.type, Timestamp), - (Targets.type, Targets), + ("root", Root), + ("snapshot", Snapshot), + ("timestamp", Timestamp), + ("targets", Targets), ]: # Load JSON-formatted metdata of each supported type from file @@ -136,7 +136,7 @@ def test_compact_json(self) -> None: ) def test_read_write_read_compare(self) -> None: - for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]: + for metadata in ["root", "snapshot", "timestamp", "targets"]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") md_obj = Metadata.from_file(path) @@ -148,7 +148,7 @@ def test_read_write_read_compare(self) -> None: os.remove(path_2) def test_to_from_bytes(self) -> None: - for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]: + for metadata in ["root", "snapshot", "timestamp", "targets"]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") with open(path, "rb") as f: metadata_bytes = f.read() @@ -169,11 +169,11 @@ def test_sign_verify(self) -> None: root = Metadata[Root].from_file(root_path).signed # Locate the public keys we need from root - targets_keyid = next(iter(root.roles[Targets.type].keyids)) + targets_keyid = next(iter(root.roles["targets"].keyids)) targets_key = root.keys[targets_keyid] - snapshot_keyid = next(iter(root.roles[Snapshot.type].keyids)) + snapshot_keyid = next(iter(root.roles["snapshot"].keyids)) snapshot_key = root.keys[snapshot_keyid] - timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids)) + timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... @@ -192,7 +192,7 @@ def test_sign_verify(self) -> None: with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type] - sslib_signer = SSlibSigner(self.keystore[Snapshot.type]) + sslib_signer = SSlibSigner(self.keystore["snapshot"]) # Append a new signature with the unrelated key and assert that ... sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and @@ -203,7 +203,7 @@ def test_sign_verify(self) -> None: # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore[Timestamp.type]) + sslib_signer = SSlibSigner(self.keystore["timestamp"]) # Create and assign (don't append) a new signature and assert that ... md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, @@ -218,7 +218,7 @@ def test_verify_failures(self) -> None: root = Metadata[Root].from_file(root_path).signed # Locate the timestamp public key we need from root - timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids)) + timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (timestamp) @@ -369,20 +369,20 @@ def test_metadata_verify_delegate(self) -> None: role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate(Root.type, root) - root.verify_delegate(Snapshot.type, snapshot) - root.verify_delegate(Targets.type, targets) + root.verify_delegate("root", root) + root.verify_delegate("snapshot", snapshot) + root.verify_delegate("targets", targets) targets.verify_delegate("role1", role1) role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate(Snapshot.type, snapshot) + snapshot.verify_delegate("snapshot", snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate(Targets.type, targets) + targets.verify_delegate("targets", targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): role2.verify_delegate("role1", role1) @@ -391,31 +391,31 @@ def test_metadata_verify_delegate(self) -> None: expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate(Snapshot.type, snapshot) + root.verify_delegate("snapshot", snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate(Timestamp.type, snapshot) + root.verify_delegate("timestamp", snapshot) # Add a key to snapshot role, make sure the new sig fails to verify - ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids)) - root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid]) + ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) + root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate(Snapshot.type, snapshot) + root.verify_delegate("snapshot", snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles[Snapshot.type].threshold = 2 + root.signed.roles["snapshot"].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate(Snapshot.type, snapshot) + root.verify_delegate("snapshot", snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True) - root.verify_delegate(Snapshot.type, snapshot) + snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) + root.verify_delegate("snapshot", snapshot) def test_key_class(self) -> None: # Test if from_securesystemslib_key removes the private key from keyval @@ -441,14 +441,14 @@ def test_root_add_key_and_remove_key(self) -> None: ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key(Root.type, key_metadata) + root.signed.add_key("root", key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles[Root.type].keyids) + self.assertIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -456,29 +456,29 @@ def test_root_add_key_and_remove_key(self) -> None: root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles[Root.type].keyids.copy() - root.signed.add_key(Root.type, key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids) + pre_add_keyid = root.signed.roles["root"].keyids.copy() + root.signed.add_key("root", key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) # Add the same key to targets role as well - root.signed.add_key(Targets.type, key_metadata) + root.signed.add_key("targets", key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key(Root.type, keyid) - self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) + root.signed.remove_key("root", keyid) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key(Targets.type, keyid) - self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids) + root.signed.remove_key("targets", keyid) + self.assertNotIn(keyid, root.signed.roles["targets"].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key(Root.type, "nosuchkey") + root.signed.remove_key("root", "nosuchkey") with self.assertRaises(ValueError): root.signed.remove_key("nosuchrole", keyid) @@ -670,7 +670,7 @@ def test_length_and_hash_validation(self) -> None: targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) file1_targetfile = targets.signed.targets["file1.txt"] - filepath = os.path.join(self.repo_dir, Targets.type, "file1.txt") + filepath = os.path.join(self.repo_dir, "targets", "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -688,7 +688,7 @@ def test_length_and_hash_validation(self) -> None: def test_targetfile_from_file(self) -> None: # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") targetfile_from_file = TargetFile.from_file( file_path, file_path, ["sha256"] ) @@ -697,20 +697,20 @@ def test_targetfile_from_file(self) -> None: targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt") + file_path = os.path.join(self.repo_dir, "targets", "file123.txt") with self.assertRaises(FileNotFoundError): TargetFile.from_file( file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM] ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") with self.assertRaises(exceptions.UnsupportedAlgorithmError): TargetFile.from_file(file_path, file_path, ["123"]) def test_targetfile_from_data(self) -> None: data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") + target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") # Test with a valid hash algorithm targetfile_from_data = TargetFile.from_data( diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 6a3142c916..9dfacf1a1d 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -58,10 +58,10 @@ def setUpClass(cls) -> None: ) cls.metadata = {} for md in [ - Root.type, - Timestamp.type, - Snapshot.type, - Targets.type, + "root", + "timestamp", + "snapshot", + "targets", "role1", "role2", ]: @@ -71,10 +71,10 @@ def setUpClass(cls) -> None: keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, Root.type + "_key"), password="password" + os.path.join(keystore_dir, "root" + "_key"), password="password" ) - cls.keystore[Root.type] = SSlibSigner(root_key_dict) - for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]: + cls.keystore["root"] = SSlibSigner(root_key_dict) + for role in ["delegation", "snapshot", "targets", "timestamp"]: key_dict = import_ed25519_privatekey_from_file( os.path.join(keystore_dir, role + "_key"), password="password" ) @@ -84,12 +84,12 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.hashes = None timestamp.snapshot_meta.length = None - cls.metadata[Timestamp.type] = cls.modify_metadata( - Timestamp.type, hashes_length_modifier + cls.metadata["timestamp"] = cls.modify_metadata( + "timestamp", hashes_length_modifier ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata[Root.type]) + self.trusted_set = TrustedMetadataSet(self.metadata["root"]) def _update_all_besides_targets( self, @@ -101,24 +101,24 @@ def _update_all_besides_targets( Args: timestamp_bytes: Bytes used when calling trusted_set.update_timestamp(). - Default self.metadata[Timestamp.type]. + Default self.metadata["timestamp"]. snapshot_bytes: Bytes used when calling trusted_set.update_snapshot(). - Default self.metadata[Snapshot.type]. + Default self.metadata["snapshot"]. """ - timestamp_bytes = timestamp_bytes or self.metadata[Timestamp.type] + timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] self.trusted_set.update_timestamp(timestamp_bytes) - snapshot_bytes = snapshot_bytes or self.metadata[Snapshot.type] + snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) def test_update(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_targets(self.metadata["targets"]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", Targets.type + self.metadata["role1"], "role1", "targets" ) self.trusted_set.update_delegated_targets( self.metadata["role2"], "role2", "role1" @@ -154,38 +154,38 @@ def test_update_metadata_output(self) -> None: def test_out_of_order_ops(self) -> None: # Update snapshot before timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) # Update root after timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_root(self.metadata[Root.type]) + self.trusted_set.update_root(self.metadata["root"]) # Update targets before snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) # update timestamp after snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) # Update delegated targets before targets with self.assertRaises(RuntimeError): self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", Targets.type + self.metadata["role1"], "role1", "targets" ) - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) # Update snapshot after sucessful targets update with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", Targets.type + self.metadata["role1"], "role1", "targets" ) def test_root_with_invalid_json(self) -> None: @@ -196,20 +196,20 @@ def test_root_with_invalid_json(self) -> None: test_func(b"") # root is invalid - root = Metadata.from_bytes(self.metadata[Root.type]) + root = Metadata.from_bytes(self.metadata["root"]) root.signed.version += 1 with self.assertRaises(exceptions.UnsignedMetadataError): test_func(root.to_bytes()) # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata[Snapshot.type]) + test_func(self.metadata["snapshot"]) def test_top_level_md_with_invalid_json(self) -> None: top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [ - (self.metadata[Timestamp.type], self.trusted_set.update_timestamp), - (self.metadata[Snapshot.type], self.trusted_set.update_snapshot), - (self.metadata[Targets.type], self.trusted_set.update_targets), + (self.metadata["timestamp"], self.trusted_set.update_timestamp), + (self.metadata["snapshot"], self.trusted_set.update_snapshot), + (self.metadata["targets"], self.trusted_set.update_targets), ] for metadata, update_func in top_level_md: md = Metadata.from_bytes(metadata) @@ -224,7 +224,7 @@ def test_top_level_md_with_invalid_json(self) -> None: # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - update_func(self.metadata[Root.type]) + update_func(self.metadata["root"]) update_func(metadata) @@ -233,53 +233,53 @@ def test_update_root_new_root(self) -> None: def root_new_version_modifier(root: Root) -> None: root.version += 1 - root = self.modify_metadata(Root.type, root_new_version_modifier) + root = self.modify_metadata("root", root_new_version_modifier) self.trusted_set.update_root(root) def test_update_root_new_root_fail_threshold_verification(self) -> None: # new_root data with threshold which cannot be verified. - root = Metadata.from_bytes(self.metadata[Root.type]) + root = Metadata.from_bytes(self.metadata["root"]) # remove root role keyids representing root signatures - root.signed.roles[Root.type].keyids = set() + root.signed.roles["root"].keyids = set() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_root(root.to_bytes()) def test_update_root_new_root_ver_same_as_trusted_root_ver(self) -> None: with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_root(self.metadata[Root.type]) + self.trusted_set.update_root(self.metadata["root"]) def test_root_expired_final_root(self) -> None: def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) # intermediate root can be expired - root = self.modify_metadata(Root.type, root_expired_modifier) + root = self.modify_metadata("root", root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) + tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self) -> None: # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - timestamp = self.modify_metadata(Timestamp.type, version_modifier) + timestamp = self.modify_metadata("timestamp", version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_snapshot_ver_below_current(self) -> None: def bump_snapshot_version(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 # set current known snapshot.json version to 2 - timestamp = self.modify_metadata(Timestamp.type, bump_snapshot_version) + timestamp = self.modify_metadata("timestamp", bump_snapshot_version) self.trusted_set.update_timestamp(timestamp) # newtimestamp.meta.version < trusted_timestamp.meta.version with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_expired(self) -> None: # new_timestamp has expired @@ -288,29 +288,29 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: # expired intermediate timestamp is loaded but raises timestamp = self.modify_metadata( - Timestamp.type, timestamp_expired_modifier + "timestamp", timestamp_expired_modifier ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) # snapshot update does start but fails because timestamp is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_length_or_hash_mismatch(self) -> None: def modify_snapshot_length(timestamp: Timestamp) -> None: timestamp.snapshot_meta.length = 1 # set known snapshot.json length to 1 - timestamp = self.modify_metadata(Timestamp.type, modify_snapshot_length) + timestamp = self.modify_metadata("timestamp", modify_snapshot_length) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_fail_threshold_verification(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) - snapshot = Metadata.from_bytes(self.metadata[Snapshot.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) @@ -322,55 +322,55 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 timestamp = self.modify_metadata( - Timestamp.type, timestamp_version_modifier + "timestamp", timestamp_version_modifier ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) # targets update starts but fails if snapshot version does not match with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_snapshot_file_removed_from_meta(self) -> None: - self._update_all_besides_targets(self.metadata[Timestamp.type]) + self._update_all_besides_targets(self.metadata["timestamp"]) def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] # Test removing a meta_file in new_snapshot compared to the old snapshot - snapshot = self.modify_metadata(Snapshot.type, remove_file_from_meta) + snapshot = self.modify_metadata("snapshot", remove_file_from_meta) with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot) def test_update_snapshot_meta_version_decreases(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) def version_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta["targets.json"].version += 1 - snapshot = self.modify_metadata(Snapshot.type, version_meta_modifier) + snapshot = self.modify_metadata("snapshot", version_meta_modifier) self.trusted_set.update_snapshot(snapshot) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_expired_new_snapshot(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) # expired intermediate snapshot is loaded but will raise - snapshot = self.modify_metadata(Snapshot.type, snapshot_expired_modifier) + snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_snapshot(snapshot) # targets update does start but fails because snapshot is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_snapshot_successful_rollback_checks(self) -> None: def meta_version_bump(timestamp: Timestamp) -> None: @@ -380,51 +380,51 @@ def version_bump(snapshot: Snapshot) -> None: snapshot.version += 1 # load a "local" timestamp, then update to newer one: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) - new_timestamp = self.modify_metadata(Timestamp.type, meta_version_bump) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + new_timestamp = self.modify_metadata("timestamp", meta_version_bump) self.trusted_set.update_timestamp(new_timestamp) # load a "local" snapshot with mismatching version (loading happens but # BadVersionNumberError is raised), then update to newer one: with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - new_snapshot = self.modify_metadata(Snapshot.type, version_bump) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + new_snapshot = self.modify_metadata("snapshot", version_bump) self.trusted_set.update_snapshot(new_snapshot) # update targets to trigger final snapshot meta version check - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_no_meta_in_snapshot(self) -> None: def no_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta = {} - snapshot = self.modify_metadata(Snapshot.type, no_meta_modifier) - self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) + snapshot = self.modify_metadata("snapshot", no_meta_modifier) + self._update_all_besides_targets(self.metadata["timestamp"], snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_hash_diverge_from_snapshot_meta_hash(self) -> None: def meta_length_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=1, length=1) - snapshot = self.modify_metadata(Snapshot.type, meta_length_modifier) - self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) + snapshot = self.modify_metadata("snapshot", meta_length_modifier) + self._update_all_besides_targets(self.metadata["timestamp"], snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_version_diverge_snapshot_meta_version(self) -> None: def meta_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=2) - snapshot = self.modify_metadata(Snapshot.type, meta_modifier) - self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) + snapshot = self.modify_metadata("snapshot", meta_modifier) + self._update_all_besides_targets(self.metadata["timestamp"], snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_expired_new_target(self) -> None: self._update_all_besides_targets() @@ -432,7 +432,7 @@ def test_update_targets_expired_new_target(self) -> None: def target_expired_modifier(target: Targets) -> None: target.expires = datetime(1970, 1, 1) - targets = self.modify_metadata(Targets.type, target_expired_modifier) + targets = self.modify_metadata("targets", target_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_targets(targets) diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 4289d7b860..d1df533baa 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -90,19 +90,19 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: "consistent_snaphot disabled": { "consistent_snapshot": False, "calls": [ - call("root", 3), - call("timestamp", None), - call("snapshot", None), - call("targets", None), + ("root", 3), + ("timestamp", None), + ("snapshot", None), + ("targets", None), ], }, "consistent_snaphot enabled": { "consistent_snapshot": True, "calls": [ - call("root", 3), - call("timestamp", None), - call("snapshot", 1), - call("targets", 1), + ("root", 3), + ("timestamp", None), + ("snapshot", 1), + ("targets", 1), ], }, } @@ -117,15 +117,14 @@ def test_top_level_roles_update(self, test_case_data: Dict[str, Any]): sim = self._init_repo(consistent_snapshot) updater = self._init_updater(sim) - with patch.object( - sim, "_fetch_metadata", wraps=sim._fetch_metadata - ) as wrapped_fetch: - updater.refresh() + # cleanup fetch tracker metadata + sim.fetch_tracker.metadata.clear() + updater.refresh() - # metadata files are fetched with the expected version (or None) - self.assertListEqual(wrapped_fetch.call_args_list, expected_calls) - # metadata files are always persisted without a version prefix - self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) + # metadata files are fetched with the expected version (or None) + self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) self._cleanup_dir(self.metadata_dir) @@ -147,7 +146,7 @@ def test_delegated_roles_update(self, test_case_data: Dict[str, Any]): consistent_snapshot: bool = test_case_data["consistent_snapshot"] expected_version: Optional[int] = test_case_data["expected_version"] rolenames = ["role1", "..", "."] - expected_calls = [call(role, expected_version) for role in rolenames] + expected_calls = [(role, expected_version) for role in rolenames] sim = self._init_repo(consistent_snapshot) # Add new delegated targets @@ -157,17 +156,17 @@ def test_delegated_roles_update(self, test_case_data: Dict[str, Any]): sim.add_delegation("targets", role, targets, False, ["*"], None) sim.update_snapshot() updater = self._init_updater(sim) + updater.refresh() - with patch.object( - sim, "_fetch_metadata", wraps=sim._fetch_metadata - ) as wrapped_fetch: - # trigger updater to fetch the delegated metadata - updater.get_targetinfo("anything") - # metadata files are fetched with the expected version (or None) - self.assertListEqual(wrapped_fetch.call_args_list, expected_calls) - # metadata files are always persisted without a version prefix - self._assert_metadata_files_exist(rolenames) + # cleanup fetch tracker metadata + sim.fetch_tracker.metadata.clear() + # trigger updater to fetch the delegated metadata + updater.get_targetinfo("anything") + # metadata files are fetched with the expected version (or None) + self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(rolenames) self._cleanup_dir(self.metadata_dir) @@ -176,16 +175,19 @@ def test_delegated_roles_update(self, test_case_data: Dict[str, Any]): "consistent_snapshot": False, "prefix_targets": True, "hash_algo": None, + "targetpaths": ["file", "file.txt", "..file.ext", "f.le"], }, "consistent_snaphot enabled without prefixed targets": { "consistent_snapshot": True, "prefix_targets": False, "hash_algo": None, + "targetpaths": ["file", "file.txt", "..file.ext", "f.le"], }, "consistent_snaphot enabled with prefixed targets": { "consistent_snapshot": True, "prefix_targets": True, "hash_algo": "sha256", + "targetpaths": ["file", "file.txt", "..file.ext", "f.le"], }, } @@ -197,36 +199,32 @@ def test_download_targets(self, test_case_data: Dict[str, Any]): consistent_snapshot: bool = test_case_data["consistent_snapshot"] prefix_targets_with_hash: bool = test_case_data["prefix_targets"] hash_algo: Optional[str] = test_case_data["hash_algo"] - targetpaths = ["file", "file.txt", "..file.ext", "f.le"] + targetpaths: List[str] = test_case_data["targetpaths"] sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash) # Add targets to repository for targetpath in targetpaths: sim.targets.version += 1 sim.add_target("targets", b"content", targetpath) + sim.update_snapshot() updater = self._init_updater(sim) updater.config.prefix_targets_with_hash = prefix_targets_with_hash updater.refresh() - with patch.object( - sim, "_fetch_target", wraps=sim._fetch_target - ) as wrapped_fetch_target: - - for targetpath in targetpaths: - info = updater.get_targetinfo(targetpath) - updater.download_target(info) - expected_prefix = ( - None if not hash_algo else info.hashes[hash_algo] - ) - # files are fetched with the expected hash prefix (or None) - wrapped_fetch_target.assert_called_once_with( - info.path, expected_prefix - ) - # target files are always persisted without hash prefix - self._assert_targets_files_exist([info.path]) - wrapped_fetch_target.reset_mock() + expected_result = [] + for targetpath in targetpaths: + info = updater.get_targetinfo(targetpath) + updater.download_target(info) + expected_prefix = None if not hash_algo else info.hashes[hash_algo] + expected_result.append((targetpath, expected_prefix)) + # target files are always persisted without hash prefix + self._assert_targets_files_exist([info.path]) + + + # files are fetched with the expected hash prefix (or None) + self.assertListEqual(sim.fetch_tracker.targets, expected_result) self._cleanup_dir(self.targets_dir) diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index 2aa93b33a7..9855c7f492 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -17,7 +17,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator from tests.utils import run_sub_tests_with_dataset -from tuf.api.metadata import Key, Root +from tuf.api.metadata import Key from tuf.exceptions import UnsignedMetadataError from tuf.ngclient import Updater @@ -176,14 +176,14 @@ def test_root_rotation(self, root_versions: List[RootVersion]) -> None: # Publish all remote root versions defined in root_versions for rootver in root_versions: # clear root keys, signers - self.sim.root.roles[Root.type].keyids.clear() - self.sim.signers[Root.type].clear() + self.sim.root.roles["root"].keyids.clear() + self.sim.signers["root"].clear() - self.sim.root.roles[Root.type].threshold = rootver.threshold + self.sim.root.roles["root"].threshold = rootver.threshold for i in rootver.keys: - self.sim.root.add_key(Root.type, self.keys[i]) + self.sim.root.add_key("root", self.keys[i]) for i in rootver.sigs: - self.sim.add_signer(Root.type, self.signers[i]) + self.sim.add_signer("root", self.signers[i]) self.sim.root.version += 1 self.sim.publish_root() diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 57907fe795..30d9e4b8a0 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -19,7 +19,7 @@ from tests import utils from tuf import exceptions, ngclient, unittest_toolbox -from tuf.api.metadata import Metadata, Root, Snapshot, TargetFile, Targets, Timestamp +from tuf.api.metadata import Metadata, Root, TargetFile logger = logging.getLogger(__name__) @@ -180,17 +180,17 @@ def test_refresh_and_download(self) -> None: # top-level metadata is in local directory already self.updater.refresh() - self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) + self._assert_files(["root", "snapshot", "targets", "timestamp"]) # Get targetinfos, assert that cache does not contain files info1 = self.updater.get_targetinfo("file1.txt") assert isinstance(info1, TargetFile) - self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) + self._assert_files(["root", "snapshot", "targets", "timestamp"]) # Get targetinfo for 'file3.txt' listed in the delegated role1 info3 = self.updater.get_targetinfo("file3.txt") assert isinstance(info3, TargetFile) - expected_files = ["role1", Root.type, Snapshot.type, Targets.type, Timestamp.type] + expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] self._assert_files(expected_files) self.assertIsNone(self.updater.find_cached_target(info1)) self.assertIsNone(self.updater.find_cached_target(info3)) @@ -214,14 +214,14 @@ def test_refresh_with_only_local_root(self) -> None: os.remove(os.path.join(self.client_directory, "role1.json")) os.remove(os.path.join(self.client_directory, "role2.json")) os.remove(os.path.join(self.client_directory, "1.root.json")) - self._assert_files([Root.type]) + self._assert_files(["root"]) self.updater.refresh() - self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) + self._assert_files(["root", "snapshot", "targets", "timestamp"]) # Get targetinfo for 'file3.txt' listed in the delegated role1 self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", Root.type, Snapshot.type, Targets.type, Timestamp.type] + expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] self._assert_files(expected_files) def test_implicit_refresh_with_only_local_root(self) -> None: diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 56e09f16c5..a5b511a60a 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -14,7 +14,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata, Root, Snapshot, Targets, Timestamp +from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -94,7 +94,7 @@ def _assert_version_equals(self, role: str, expected_version: int) -> None: def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json - self._assert_files_exist([Root.type]) + self._assert_files_exist(["root"]) # Add one more root version to repository so that # refresh() updates from local trusted root (v1) to @@ -106,7 +106,7 @@ def test_first_time_refresh(self) -> None: self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) for role in TOP_LEVEL_ROLE_NAMES: - version = 2 if role == Root.type else None + version = 2 if role == "root" else None self._assert_content_equals(role, version) def test_trusted_root_missing(self) -> None: @@ -129,8 +129,8 @@ def test_trusted_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): updater.refresh() - self._assert_files_exist([Root.type]) - self._assert_content_equals(Root.type, 2) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 2) # Local root metadata can be loaded even if expired updater = self._init_updater() @@ -143,7 +143,7 @@ def test_trusted_root_expired(self) -> None: # Root is successfully updated to latest version self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals(Root.type, 3) + self._assert_content_equals("root", 3) def test_trusted_root_unsigned(self) -> None: # Local trusted root is not signed @@ -156,7 +156,7 @@ def test_trusted_root_unsigned(self) -> None: self._run_refresh() # The update failed, no changes in metadata - self._assert_files_exist([Root.type]) + self._assert_files_exist(["root"]) md_root_after = Metadata.from_file(root_path) self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes()) @@ -181,7 +181,7 @@ def test_max_root_rotations(self) -> None: # Assert that root version was increased with no more # than 'max_root_rotations' self._assert_version_equals( - Root.type, initial_root_version + updater.config.max_root_rotations + "root", initial_root_version + updater.config.max_root_rotations ) def test_intermediate_root_incorrectly_signed(self) -> None: @@ -189,13 +189,13 @@ def test_intermediate_root_incorrectly_signed(self) -> None: # Intermediate root v2 is unsigned self.sim.root.version += 1 - root_signers = self.sim.signers[Root.type].copy() - self.sim.signers[Root.type].clear() + root_signers = self.sim.signers["root"].copy() + self.sim.signers["root"].clear() self.sim.publish_root() # Final root v3 is correctly signed self.sim.root.version += 1 - self.sim.signers[Root.type] = root_signers + self.sim.signers["root"] = root_signers self.sim.publish_root() # Incorrectly signed intermediate root is detected @@ -203,8 +203,8 @@ def test_intermediate_root_incorrectly_signed(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist([Root.type]) - self._assert_content_equals(Root.type, 1) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file @@ -224,20 +224,20 @@ def test_intermediate_root_expired(self) -> None: # Successfully updated to root v3 self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals(Root.type, 3) + self._assert_content_equals("root", 3) def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack self.sim.root.version += 1 # root v2 - self.sim.signers[Root.type].clear() + self.sim.signers["root"].clear() self.sim.publish_root() with self.assertRaises(UnsignedMetadataError): self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist([Root.type]) - self._assert_content_equals(Root.type, 1) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) def test_new_root_same_version(self) -> None: # Check for a rollback_attack @@ -247,8 +247,8 @@ def test_new_root_same_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist([Root.type]) - self._assert_content_equals(Root.type, 1) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) def test_new_root_nonconsecutive_version(self) -> None: # Repository serves non-consecutive root version @@ -258,8 +258,8 @@ def test_new_root_nonconsecutive_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist([Root.type]) - self._assert_content_equals(Root.type, 1) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) def test_final_root_expired(self) -> None: # Check for a freeze attack @@ -272,16 +272,16 @@ def test_final_root_expired(self) -> None: self._run_refresh() # The update failed but final root is persisted on the file system - self._assert_files_exist([Root.type]) - self._assert_content_equals(Root.type, 2) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 2) def test_new_timestamp_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers[Timestamp.type].clear() + self.sim.signers["timestamp"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist([Root.type]) + self._assert_files_exist(["root"]) def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack @@ -292,7 +292,7 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals(Timestamp.type, 2) + self._assert_version_equals("timestamp", 2) def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. @@ -307,7 +307,7 @@ def test_new_timestamp_snapshot_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals(Timestamp.type, 2) + self._assert_version_equals("timestamp", 2) def test_new_timestamp_expired(self) -> None: # Check for a freeze attack @@ -317,7 +317,7 @@ def test_new_timestamp_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist([Root.type]) + self._assert_files_exist(["root"]) def test_new_snapshot_hash_mismatch(self) -> None: # Check against timestamp role’s snapshot hash @@ -338,16 +338,16 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals(Timestamp.type, 3) - self._assert_version_equals(Snapshot.type, 1) + self._assert_version_equals("timestamp", 3) + self._assert_version_equals("snapshot", 1) def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers[Snapshot.type].clear() + self.sim.signers["snapshot"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist([Root.type, Timestamp.type]) + self._assert_files_exist(["root", "timestamp"]) def test_new_snapshot_version_mismatch(self) -> None: # Check against timestamp role’s snapshot version @@ -357,7 +357,7 @@ def test_new_snapshot_version_mismatch(self) -> None: with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist([Root.type, Timestamp.type]) + self._assert_files_exist(["root", "timestamp"]) def test_new_snapshot_version_rollback(self) -> None: # Check for a rollback attack @@ -371,7 +371,7 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals(Snapshot.type, 2) + self._assert_version_equals("snapshot", 2) def test_new_snapshot_expired(self) -> None: # Check for a freeze attack @@ -381,7 +381,7 @@ def test_new_snapshot_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist([Root.type, Timestamp.type]) + self._assert_files_exist(["root", "timestamp"]) def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -403,16 +403,16 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals(Snapshot.type, 3) - self._assert_version_equals(Targets.type, 1) + self._assert_version_equals("snapshot", 3) + self._assert_version_equals("targets", 1) def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers[Targets.type].clear() + self.sim.signers["targets"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) + self._assert_files_exist(["root", "timestamp", "snapshot"]) def test_new_targets_version_mismatch(self) -> None: # Check against snapshot role’s targets version @@ -422,7 +422,7 @@ def test_new_targets_version_mismatch(self) -> None: with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) + self._assert_files_exist(["root", "timestamp", "snapshot"]) def test_new_targets_expired(self) -> None: # Check for a freeze attack. @@ -432,7 +432,7 @@ def test_new_targets_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) + self._assert_files_exist(["root", "timestamp", "snapshot"]) if __name__ == "__main__": diff --git a/tox.ini b/tox.ini index a6e0d765d8..b8359b7772 100644 --- a/tox.ini +++ b/tox.ini @@ -40,9 +40,10 @@ commands = changedir = {toxinidir} commands = # Use different configs for new (tuf/api/*) and legacy code - black --check --diff tuf/api tuf/ngclient - isort --check --diff tuf/api tuf/ngclient - pylint -j 0 tuf/api tuf/ngclient --rcfile=pyproject.toml + # TODO: configure black and isort args in pyproject.toml (see #1161) + black --check --diff --line-length 80 tuf/api tuf/ngclient + isort --check --diff --line-length 80 --profile black -p tuf tuf/api tuf/ngclient + pylint -j 0 tuf/api tuf/ngclient --rcfile=tuf/api/pylintrc # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does # work, unfortunately each subdirectory has to be ignored explicitly. diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 6adcf412da..4ab4360fe5 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -61,11 +61,6 @@ SignedSerializer, ) -_ROOT = "root" -_SNAPSHOT = "snapshot" -_TARGETS = "targets" -_TIMESTAMP = "timestamp" - # pylint: disable=too-many-lines logger = logging.getLogger(__name__) @@ -73,7 +68,7 @@ # We aim to support SPECIFICATION_VERSION and require the input metadata # files to have the same major version (the first number) as ours. SPECIFICATION_VERSION = ["1", "0", "19"] -TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} +TOP_LEVEL_ROLE_NAMES = {"root", "timestamp", "snapshot", "targets"} # T is a Generic type constraint for Metadata.signed T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") @@ -135,13 +130,13 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": # Dispatch to contained metadata class on metadata _type field. _type = metadata["signed"]["_type"] - if _type == _TARGETS: + if _type == "targets": inner_cls: Type[Signed] = Targets - elif _type == _SNAPSHOT: + elif _type == "snapshot": inner_cls = Snapshot - elif _type == _TIMESTAMP: + elif _type == "timestamp": inner_cls = Timestamp - elif _type == _ROOT: + elif _type == "root": inner_cls = Root else: raise ValueError(f'unrecognized metadata type "{_type}"') @@ -399,13 +394,18 @@ class Signed(metaclass=abc.ABCMeta): unrecognized_fields: Dictionary of all unrecognized fields. """ - # type is required for static reference without changing the API - type: ClassVar[str] = "signed" + # Signed implementations are expected to override this + _signed_type: ClassVar[str] = "signed" # _type and type are identical: 1st replicates file format, 2nd passes lint @property def _type(self) -> str: - return self.type + return self._signed_type + + @property + def type(self) -> str: + """Metadata type as string.""" + return self._signed_type # NOTE: Signed is a stupid name, because this might not be signed yet, but # we keep it to match spec terminology (I often refer to this as "payload", @@ -458,8 +458,8 @@ def _common_fields_from_dict( """ _type = signed_dict.pop("_type") - if _type != cls.type: - raise ValueError(f"Expected type {cls.type}, got {_type}") + if _type != cls._signed_type: + raise ValueError(f"Expected type {cls._signed_type}, got {_type}") version = signed_dict.pop("version") spec_version = signed_dict.pop("spec_version") @@ -539,7 +539,7 @@ def __init__( ): if not all( isinstance(at, str) for at in [keyid, keytype, scheme] - ) or not isinstance(keyval, dict): + ) or not isinstance(keyval, Dict): raise TypeError("Unexpected Key attributes types!") self.keyid = keyid self.keytype = keytype @@ -712,7 +712,7 @@ class Root(Signed): unrecognized_fields: Dictionary of all unrecognized fields. """ - type = _ROOT + _signed_type = "root" # TODO: determine an appropriate value for max-args # pylint: disable=too-many-arguments @@ -965,7 +965,7 @@ class Timestamp(Signed): snapshot_meta: Meta information for snapshot metadata. """ - type = _TIMESTAMP + _signed_type = "timestamp" def __init__( self, @@ -1015,7 +1015,7 @@ class Snapshot(Signed): meta: A dictionary of target metadata filenames to MetaFile objects. """ - type = _SNAPSHOT + _signed_type = "snapshot" def __init__( self, @@ -1416,7 +1416,7 @@ class Targets(Signed): unrecognized_fields: Dictionary of all unrecognized fields. """ - type = _TARGETS + _signed_type = "targets" # TODO: determine an appropriate value for max-args # pylint: disable=too-many-arguments @@ -1437,7 +1437,7 @@ def __init__( def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": """Creates Targets object from its dict representation.""" common_args = cls._common_fields_from_dict(signed_dict) - targets = signed_dict.pop(_TARGETS) + targets = signed_dict.pop("targets") try: delegations_dict = signed_dict.pop("delegations") except KeyError: @@ -1458,7 +1458,7 @@ def to_dict(self) -> Dict[str, Any]: targets = {} for target_path, target_file_obj in self.targets.items(): targets[target_path] = target_file_obj.to_dict() - targets_dict[_TARGETS] = targets + targets_dict["targets"] = targets if self.delegations is not None: targets_dict["delegations"] = self.delegations.to_dict() return targets_dict diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc new file mode 100644 index 0000000000..d9b1da754a --- /dev/null +++ b/tuf/api/pylintrc @@ -0,0 +1,46 @@ +# Minimal pylint configuration file for Secure Systems Lab Python Style Guide: +# https://github.com/secure-systems-lab/code-style-guidelines +# +# Based on Google Python Style Guide pylintrc and pylint defaults: +# https://google.github.io/styleguide/pylintrc +# http://pylint.pycqa.org/en/latest/technical_reference/features.html + +[MESSAGES CONTROL] +# Disable the message, report, category or checker with the given id(s). +# NOTE: To keep this config as short as possible we only disable checks that +# are currently in conflict with our code. If new code displeases the linter +# (for good reasons) consider updating this config file, or disable checks with +# 'pylint: disable=XYZ' comments. +disable=fixme, + too-few-public-methods, + too-many-arguments, + format, + duplicate-code, + +[BASIC] +good-names=i,j,k,v,e,f,fn,fp,_type,_ +# Regexes for allowed names are copied from the Google pylintrc +# NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. +# If there are multiple groups it enfoces the prevalent naming style inside +# each modules. Names in the exempt capturing group are ignored. +function-rgx=^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$ +method-rgx=(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$ +argument-rgx=^[a-z][a-z0-9_]*$ +attr-rgx=^_{0,2}[a-z][a-z0-9_]*$ +class-attribute-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$ +class-rgx=^_?[A-Z][a-zA-Z0-9]*$ +const-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$ +inlinevar-rgx=^[a-z][a-z0-9_]*$ +module-rgx=^(_?[a-z][a-z0-9_]*|__init__)$ +no-docstring-rgx=(__.*__|main|test.*|.*test|.*Test)$ +variable-rgx=^[a-z][a-z0-9_]*$ +docstring-min-length=10 + +[LOGGING] +logging-format-style=old + +[MISCELLANEOUS] +notes=TODO + +[STRING] +check-quote-consistency=yes diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index e502609cd0..b7c831158c 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -10,7 +10,7 @@ network IO, which are not handled here. Loaded metadata can be accessed via index access with rolename as key -(trusted_set[Root.type]) or, in the case of top-level metadata, using the helper +(trusted_set["root"]) or, in the case of top-level metadata, using the helper properties (trusted_set.root). The rules that TrustedMetadataSet follows for top-level metadata are @@ -35,7 +35,7 @@ >>> trusted_set = TrustedMetadataSet(f.read()) >>> >>> # update root from remote until no more are available ->>> with download(Root.type, trusted_set.root.signed.version + 1) as f: +>>> with download("root", trusted_set.root.signed.version + 1) as f: >>> trusted_set.update_root(f.read()) >>> >>> # load local timestamp, then update from remote @@ -45,7 +45,7 @@ >>> except (RepositoryError, OSError): >>> pass # failure to load a local file is ok >>> ->>> with download(Timestamp.type) as f: +>>> with download("timestamp") as f: >>> trusted_set.update_timestamp(f.read()) >>> >>> # load local snapshot, then update from remote if needed @@ -55,7 +55,7 @@ >>> except (RepositoryError, OSError): >>> # local snapshot is not valid, load from remote >>> # (RepositoryErrors here stop the update) ->>> with download(Snapshot.type, version) as f: +>>> with download("snapshot", version) as f: >>> trusted_set.update_snapshot(f.read()) TODO: @@ -123,22 +123,22 @@ def __iter__(self) -> Iterator[Metadata]: @property def root(self) -> Metadata[Root]: """Current root Metadata""" - return self._trusted_set[Root.type] + return self._trusted_set["root"] @property def timestamp(self) -> Optional[Metadata[Timestamp]]: """Current timestamp Metadata or None""" - return self._trusted_set.get(Timestamp.type) + return self._trusted_set.get("timestamp") @property def snapshot(self) -> Optional[Metadata[Snapshot]]: """Current snapshot Metadata or None""" - return self._trusted_set.get(Snapshot.type) + return self._trusted_set.get("snapshot") @property def targets(self) -> Optional[Metadata[Targets]]: """Current targets Metadata or None""" - return self._trusted_set.get(Targets.type) + return self._trusted_set.get("targets") # Methods for updating metadata def update_root(self, data: bytes) -> Metadata[Root]: @@ -166,25 +166,23 @@ def update_root(self, data: bytes) -> Metadata[Root]: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e - if new_root.signed.type != Root.type: + if new_root.signed.type != "root": raise exceptions.RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) # Verify that new root is signed by trusted root - self.root.verify_delegate(Root.type, new_root) + self.root.verify_delegate("root", new_root) if new_root.signed.version != self.root.signed.version + 1: raise exceptions.ReplayedMetadataError( - Root.type, - new_root.signed.version, - self.root.signed.version, + "root", new_root.signed.version, self.root.signed.version ) # Verify that new root is signed by itself - new_root.verify_delegate(Root.type, new_root) + new_root.verify_delegate("root", new_root) - self._trusted_set[Root.type] = new_root + self._trusted_set["root"] = new_root logger.info("Updated root v%d", new_root.signed.version) return new_root @@ -224,12 +222,12 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load timestamp") from e - if new_timestamp.signed.type != Timestamp.type: + if new_timestamp.signed.type != "timestamp": raise exceptions.RepositoryError( f"Expected 'timestamp', got '{new_timestamp.signed.type}'" ) - self.root.verify_delegate(Timestamp.type, new_timestamp) + self.root.verify_delegate("timestamp", new_timestamp) # If an existing trusted timestamp is updated, # check for a rollback attack @@ -237,7 +235,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: # Prevent rolling back timestamp version if new_timestamp.signed.version < self.timestamp.signed.version: raise exceptions.ReplayedMetadataError( - Timestamp.type, + "timestamp", new_timestamp.signed.version, self.timestamp.signed.version, ) @@ -247,7 +245,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: < self.timestamp.signed.snapshot_meta.version ): raise exceptions.ReplayedMetadataError( - Snapshot.type, + "snapshot", new_timestamp.signed.snapshot_meta.version, self.timestamp.signed.snapshot_meta.version, ) @@ -255,7 +253,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: # expiry not checked to allow old timestamp to be used for rollback # protection of new timestamp: expiry is checked in update_snapshot() - self._trusted_set[Timestamp.type] = new_timestamp + self._trusted_set["timestamp"] = new_timestamp logger.info("Updated timestamp v%d", new_timestamp.signed.version) # timestamp is loaded: raise if it is not valid _final_ timestamp @@ -325,12 +323,12 @@ def update_snapshot( except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e - if new_snapshot.signed.type != Snapshot.type: + if new_snapshot.signed.type != "snapshot": raise exceptions.RepositoryError( f"Expected 'snapshot', got '{new_snapshot.signed.type}'" ) - self.root.verify_delegate(Snapshot.type, new_snapshot) + self.root.verify_delegate("snapshot", new_snapshot) # version not checked against meta version to allow old snapshot to be # used in rollback protection: it is checked when targets is updated @@ -356,7 +354,7 @@ def update_snapshot( # expiry not checked to allow old snapshot to be used for rollback # protection of new snapshot: it is checked when targets is updated - self._trusted_set[Snapshot.type] = new_snapshot + self._trusted_set["snapshot"] = new_snapshot logger.info("Updated snapshot v%d", new_snapshot.signed.version) # snapshot is loaded, but we raise if it's not valid _final_ snapshot @@ -391,7 +389,7 @@ def update_targets(self, data: bytes) -> Metadata[Targets]: Returns: Deserialized and verified targets Metadata object """ - return self.update_delegated_targets(data, Targets.type, Root.type) + return self.update_delegated_targets(data, "targets", "root") def update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str @@ -442,7 +440,7 @@ def update_delegated_targets( except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e - if new_delegate.signed.type != Targets.type: + if new_delegate.signed.type != "targets": raise exceptions.RepositoryError( f"Expected 'targets', got '{new_delegate.signed.type}'" ) @@ -474,12 +472,12 @@ def _load_trusted_root(self, data: bytes) -> None: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e - if new_root.signed.type != Root.type: + if new_root.signed.type != "root": raise exceptions.RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) - new_root.verify_delegate(Root.type, new_root) + new_root.verify_delegate("root", new_root) - self._trusted_set[Root.type] = new_root + self._trusted_set["root"] = new_root logger.info("Loaded trusted root v%d", new_root.signed.version) diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 955d930df6..649a2b4bc3 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -68,14 +68,7 @@ from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.api.metadata import ( - Metadata, - Root, - Snapshot, - TargetFile, - Targets, - Timestamp, -) +from tuf.api.metadata import Metadata, TargetFile, Targets from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -121,7 +114,7 @@ def __init__( self._target_base_url = _ensure_trailing_slash(target_base_url) # Read trusted local root metadata - data = self._load_local_metadata(Root.type) + data = self._load_local_metadata("root") self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() @@ -153,7 +146,7 @@ def refresh(self) -> None: self._load_root() self._load_timestamp() self._load_snapshot() - self._load_targets(Targets.type, Root.type) + self._load_targets("targets", "root") def _generate_target_file_path(self, targetinfo: TargetFile) -> str: if self.target_dir is None: @@ -327,12 +320,10 @@ def _load_root(self) -> None: for next_version in range(lower_bound, upper_bound): try: data = self._download_metadata( - Root.type, - self.config.root_max_length, - next_version, + "root", self.config.root_max_length, next_version ) self._trusted_set.update_root(data) - self._persist_metadata(Root.type, data) + self._persist_metadata("root", data) except exceptions.FetcherHTTPError as exception: if exception.status_code not in {403, 404}: @@ -343,7 +334,7 @@ def _load_root(self) -> None: def _load_timestamp(self) -> None: """Load local and remote timestamp metadata""" try: - data = self._load_local_metadata(Timestamp.type) + data = self._load_local_metadata("timestamp") self._trusted_set.update_timestamp(data) except (OSError, exceptions.RepositoryError) as e: # Local timestamp does not exist or is invalid @@ -351,15 +342,15 @@ def _load_timestamp(self) -> None: # Load from remote (whether local load succeeded or not) data = self._download_metadata( - Timestamp.type, self.config.timestamp_max_length + "timestamp", self.config.timestamp_max_length ) self._trusted_set.update_timestamp(data) - self._persist_metadata(Timestamp.type, data) + self._persist_metadata("timestamp", data) def _load_snapshot(self) -> None: """Load local (and if needed remote) snapshot metadata""" try: - data = self._load_local_metadata(Snapshot.type) + data = self._load_local_metadata("snapshot") self._trusted_set.update_snapshot(data, trusted=True) logger.debug("Local snapshot is valid: not downloading new one") except (OSError, exceptions.RepositoryError) as e: @@ -373,9 +364,9 @@ def _load_snapshot(self) -> None: if self._trusted_set.root.signed.consistent_snapshot: version = snapshot_meta.version - data = self._download_metadata(Snapshot.type, length, version) + data = self._download_metadata("snapshot", length, version) self._trusted_set.update_snapshot(data) - self._persist_metadata(Snapshot.type, data) + self._persist_metadata("snapshot", data) def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: """Load local (and if needed remote) metadata for 'role'.""" @@ -421,7 +412,7 @@ def _preorder_depth_first_walk( # List of delegations to be interrogated. A (role, parent role) pair # is needed to load and verify the delegated targets metadata. - delegations_to_visit = [(Targets.type, Root.type)] + delegations_to_visit = [("targets", "root")] visited_role_names: Set[str] = set() number_of_delegations = self.config.max_delegations