Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 74 additions & 110 deletions tests/test_updater_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tuf.unittest_toolbox as unittest_toolbox

from tests import utils
from tuf.api.metadata import Metadata
from tuf.api.metadata import Metadata, TargetFile
from tuf import exceptions, ngclient
from securesystemslib.signer import SSlibSigner
from securesystemslib.interface import import_rsa_privatekey_from_file
Expand Down Expand Up @@ -111,11 +111,14 @@ def setUp(self):

self.metadata_url = f"{url_prefix}/metadata/"
self.targets_url = f"{url_prefix}/targets/"
self.destination_directory = self.make_temp_directory()
self.dl_dir = self.make_temp_directory()
# Creating a repository instance. The test cases will use this client
# updater to refresh metadata, fetch target files, etc.
self.repository_updater = ngclient.Updater(
self.client_directory, self.metadata_url, self.targets_url
self.updater = ngclient.Updater(
repository_dir=self.client_directory,
metadata_base_url=self.metadata_url,
target_dir=self.dl_dir,
target_base_url=self.targets_url
)

def tearDown(self):
Expand Down Expand Up @@ -187,53 +190,40 @@ def consistent_snapshot_modifier(root):
self._modify_repository_root(
consistent_snapshot_modifier, bump_version=True
)
self.repository_updater = ngclient.Updater(
self.client_directory, self.metadata_url, self.targets_url
updater = ngclient.Updater(
self.client_directory, self.metadata_url, self.dl_dir, self.targets_url
)

# All metadata is in local directory already
self.repository_updater.refresh()
updater.refresh()
# Make sure that consistent snapshot is enabled
self.assertTrue(
self.repository_updater._trusted_set.root.signed.consistent_snapshot
)
# Get targetinfo for "file1.txt" listed in targets
targetinfo1 = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
# Get targetinfo for "file3.txt" listed in the delegated role1
targetinfo3 = self.repository_updater.get_one_valid_targetinfo(
"file3.txt"
updater._trusted_set.root.signed.consistent_snapshot
)

# Get targetinfos, assert cache does not contain the files
info1 = updater.get_targetinfo("file1.txt")
info3 = updater.get_targetinfo("file3.txt")
self.assertIsNone(updater.find_cached_target(info1))
self.assertIsNone(updater.find_cached_target(info3))

# Create consistent targets with file path HASH.FILENAME.EXT
target1_hash = list(targetinfo1.hashes.values())[0]
target3_hash = list(targetinfo3.hashes.values())[0]
target1_hash = list(info1.hashes.values())[0]
target3_hash = list(info3.hashes.values())[0]
self._create_consistent_target("file1.txt", target1_hash)
self._create_consistent_target("file3.txt", target3_hash)

updated_targets = self.repository_updater.updated_targets(
[targetinfo1, targetinfo3], self.destination_directory
)

self.assertListEqual(updated_targets, [targetinfo1, targetinfo3])
self.repository_updater.download_target(
targetinfo1, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
)

self.assertListEqual(updated_targets, [targetinfo3])

self.repository_updater.download_target(
targetinfo3, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
)
# Download files, assert that cache has correct files
updater.download_target(info1)
path = updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
self.assertIsNone(updater.find_cached_target(info3))

self.assertListEqual(updated_targets, [])
updater.download_target(info3)
path = updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
path = updater.find_cached_target(info3)
self.assertEqual(path, os.path.join(self.dl_dir, info3.path))

def test_refresh_and_download(self):
# Test refresh without consistent targets - targets without hash prefixes.
Expand All @@ -244,50 +234,33 @@ def test_refresh_and_download(self):
os.remove(os.path.join(self.client_directory, "1.root.json"))

# top-level metadata is in local directory already
self.repository_updater.refresh()
self.updater.refresh()
self._assert_files(["root", "snapshot", "targets", "timestamp"])

# Get targetinfo for 'file1.txt' listed in targets
targetinfo1 = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
# Get targetinfos, assert that cache does not contain files
info1 = self.updater.get_targetinfo("file1.txt")
self._assert_files(["root", "snapshot", "targets", "timestamp"])

# Get targetinfo for 'file3.txt' listed in the delegated role1
targetinfo3 = self.repository_updater.get_one_valid_targetinfo(
"file3.txt"
)
info3 = self.updater.get_targetinfo("file3.txt")
expected_files = ["role1", "root", "snapshot", "targets", "timestamp"]
self._assert_files(expected_files)
self.assertIsNone(self.updater.find_cached_target(info1))
self.assertIsNone(self.updater.find_cached_target(info3))

updated_targets = self.repository_updater.updated_targets(
[targetinfo1, targetinfo3], self.destination_directory
)

self.assertListEqual(updated_targets, [targetinfo1, targetinfo3])

self.repository_updater.download_target(
targetinfo1, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
# Download files, assert that cache has correct files
self.updater.download_target(info1)
path = self.updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
self.assertIsNone(
self.updater.find_cached_target(info3)
)

self.assertListEqual(updated_targets, [targetinfo3])

# Check that duplicates are excluded
updated_targets = self.repository_updater.updated_targets(
[targetinfo3, targetinfo3], self.destination_directory
)
self.assertListEqual(updated_targets, [targetinfo3])

self.repository_updater.download_target(
targetinfo3, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
)

self.assertListEqual(updated_targets, [])
self.updater.download_target(info3)
path = self.updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
path = self.updater.find_cached_target(info3)
self.assertEqual(path, os.path.join(self.dl_dir, info3.path))

def test_refresh_with_only_local_root(self):
os.remove(os.path.join(self.client_directory, "timestamp.json"))
Expand All @@ -298,70 +271,61 @@ def test_refresh_with_only_local_root(self):
os.remove(os.path.join(self.client_directory, "1.root.json"))
self._assert_files(["root"])

self.repository_updater.refresh()
self.updater.refresh()
self._assert_files(["root", "snapshot", "targets", "timestamp"])

# Get targetinfo for 'file3.txt' listed in the delegated role1
targetinfo3 = self.repository_updater.get_one_valid_targetinfo(
"file3.txt"
)
targetinfo3 = self.updater.get_targetinfo("file3.txt")
expected_files = ["role1", "root", "snapshot", "targets", "timestamp"]
self._assert_files(expected_files)

def test_both_target_urls_not_set(self):
# target_base_url = None and Updater._target_base_url = None
self.repository_updater = ngclient.Updater(
self.client_directory, self.metadata_url
)
updater = ngclient.Updater(self.client_directory, self.metadata_url, self.dl_dir)
info = TargetFile(1, {"sha256": ""}, "targetpath")
with self.assertRaises(ValueError):
self.repository_updater.download_target(
[], self.destination_directory
)
updater.download_target(info)

def test_no_target_dir_no_filepath(self):
# filepath = None and Updater.target_dir = None
updater = ngclient.Updater(self.client_directory, self.metadata_url)
info = TargetFile(1, {"sha256": ""}, "targetpath")
with self.assertRaises(ValueError):
updater.find_cached_target(info)
with self.assertRaises(ValueError):
updater.download_target(info)

def test_external_targets_url(self):
self.repository_updater.refresh()
targetinfo = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
self.repository_updater.download_target(
targetinfo, self.destination_directory, self.targets_url
)
self.updater.refresh()
info = self.updater.get_targetinfo("file1.txt")

self.updater.download_target(info, target_base_url=self.targets_url)

def test_length_hash_mismatch(self):
self.repository_updater.refresh()
targetinfo = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
self.updater.refresh()
targetinfo = self.updater.get_targetinfo("file1.txt")

length = targetinfo.length
with self.assertRaises(exceptions.RepositoryError):
targetinfo.length = 44
self.repository_updater.download_target(
targetinfo, self.destination_directory
)
self.updater.download_target(targetinfo)

with self.assertRaises(exceptions.RepositoryError):
targetinfo.length = length
targetinfo.hashes = {"sha256": "abcd"}
self.repository_updater.download_target(
targetinfo, self.destination_directory
)
self.updater.download_target(targetinfo)

def test_updating_root(self):
# Bump root version, resign and refresh
self._modify_repository_root(lambda root: None, bump_version=True)
self.repository_updater.refresh()
self.assertEqual(
self.repository_updater._trusted_set.root.signed.version, 2
)
self.updater.refresh()
self.assertEqual(self.updater._trusted_set.root.signed.version, 2)

def test_missing_targetinfo(self):
self.repository_updater.refresh()
self.updater.refresh()

# Get targetinfo for non-existing file
targetinfo = self.repository_updater.get_one_valid_targetinfo(
"file33.txt"
)
self.assertIsNone(targetinfo)
self.assertIsNone(self.updater.get_targetinfo("file33.txt"))


if __name__ == "__main__":
Expand Down
46 changes: 23 additions & 23 deletions tests/test_updater_with_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def _run_refresh(self) -> Updater:
updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim
)
Expand Down Expand Up @@ -90,9 +91,13 @@ def test_refresh(self):
@utils.run_sub_tests_with_dataset(targets)
def test_targets(self, test_case_data: Tuple[str, bytes, str]):
targetpath, content, encoded_path = test_case_data
# target does not exist yet
path = os.path.join(self.targets_dir, encoded_path)

updater = self._run_refresh()
self.assertIsNone(updater.get_one_valid_targetinfo(targetpath))
# target does not exist yet, configuration is what we expect
self.assertIsNone(updater.get_targetinfo(targetpath))
self.assertTrue(self.sim.root.consistent_snapshot)
self.assertTrue(updater.config.prefix_targets_with_hash)

# Add targets to repository
self.sim.targets.version += 1
Expand All @@ -101,30 +106,25 @@ def test_targets(self, test_case_data: Tuple[str, bytes, str]):

updater = self._run_refresh()
# target now exists, is not in cache yet
file_info = updater.get_one_valid_targetinfo(targetpath)
self.assertIsNotNone(file_info)
self.assertEqual(
updater.updated_targets([file_info], self.targets_dir),
[file_info]
)
info = updater.get_targetinfo(targetpath)
self.assertIsNotNone(info)
# Test without and with explicit local filepath
self.assertIsNone(updater.find_cached_target(info))
self.assertIsNone(updater.find_cached_target(info, path))

# Assert consistent_snapshot is True and downloaded targets have prefix.
self.assertTrue(self.sim.root.consistent_snapshot)
self.assertTrue(updater.config.prefix_targets_with_hash)
# download target, assert it is in cache and content is correct
local_path = updater.download_target(file_info, self.targets_dir)
self.assertEqual(
updater.updated_targets([file_info], self.targets_dir), []
)
self.assertTrue(local_path.startswith(self.targets_dir))
with open(local_path, "rb") as f:
self.assertEqual(f.read(), content)

# Assert that the targetpath was URL encoded as expected.
encoded_absolute_path = os.path.join(self.targets_dir, encoded_path)
self.assertEqual(local_path, encoded_absolute_path)
self.assertEqual(path, updater.download_target(info))
self.assertEqual(path, updater.find_cached_target(info))
self.assertEqual(path, updater.find_cached_target(info, path))

with open(path, "rb") as f:
self.assertEqual(f.read(), content)

# download using explicit filepath as well
os.remove(path)
self.assertEqual(path, updater.download_target(info, path))
self.assertEqual(path, updater.find_cached_target(info))
self.assertEqual(path, updater.find_cached_target(info, path))

def test_fishy_rolenames(self):
roles_to_filenames = {
Expand All @@ -145,7 +145,7 @@ def test_fishy_rolenames(self):
updater = self._run_refresh()

# trigger updater to fetch the delegated metadata, check filenames
updater.get_one_valid_targetinfo("anything")
updater.get_targetinfo("anything")
local_metadata = os.listdir(self.metadata_dir)
for fname in roles_to_filenames.values():
self.assertTrue(fname in local_metadata)
Expand Down
Loading