diff --git a/subiquity/common/filesystem/boot.py b/subiquity/common/filesystem/boot.py index d86e03dbd..90e8807fb 100644 --- a/subiquity/common/filesystem/boot.py +++ b/subiquity/common/filesystem/boot.py @@ -63,6 +63,9 @@ class MakeBootDevicePlan(abc.ABC): this a boot device" in sync. """ + def new_partition_count(self) -> int: + return 0 + @abc.abstractmethod def apply(self, manipulator): pass @@ -77,6 +80,10 @@ class CreatePartPlan(MakeBootDevicePlan): spec: dict = attr.ib(factory=dict) args: dict = attr.ib(factory=dict) + # TODO add @typing.override decorator when we switch to core24. + def new_partition_count(self) -> int: + return 1 + def apply(self, manipulator): manipulator.create_partition(self.gap.device, self.gap, self.spec, **self.args) @@ -156,6 +163,10 @@ def apply(self, manipulator): for plan in self.plans: plan.apply(manipulator) + # TODO add @typing.override decorator when we switch to core24. + def new_partition_count(self) -> int: + return sum([plan.new_partition_count() for plan in self.plans]) + def get_boot_device_plan_bios(device) -> Optional[MakeBootDevicePlan]: attr_plan = SetAttrPlan(device, "grub_device", True) diff --git a/subiquity/common/filesystem/tests/test_boot.py b/subiquity/common/filesystem/tests/test_boot.py new file mode 100644 index 000000000..51b014ca1 --- /dev/null +++ b/subiquity/common/filesystem/tests/test_boot.py @@ -0,0 +1,71 @@ +# Copyright 2024 Canonical, Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import unittest +from unittest.mock import Mock + +import attr + +from subiquity.common.filesystem.boot import ( + CreatePartPlan, + MountBootEfiPlan, + MultiStepPlan, + NoOpBootPlan, + ResizePlan, + SetAttrPlan, + SlidePlan, +) + + +class TestMakeBootDevicePlan(unittest.TestCase): + @unittest.skipUnless( + hasattr(attr.validators, "disabled"), + "this test requires attr.validators.disabled context manager", + ) + def test_new_partition_count__single(self): + self.assertEqual(1, CreatePartPlan(Mock()).new_partition_count()) + with attr.validators.disabled(): + self.assertEqual(0, ResizePlan(Mock()).new_partition_count()) + with attr.validators.disabled(): + self.assertEqual(0, SlidePlan(Mock()).new_partition_count()) + self.assertEqual(0, SetAttrPlan(Mock(), Mock(), Mock()).new_partition_count()) + self.assertEqual(0, MountBootEfiPlan(Mock()).new_partition_count()) + self.assertEqual(0, NoOpBootPlan().new_partition_count()) + + def test_new_partition_count__multi_step(self): + self.assertEqual(0, MultiStepPlan([]).new_partition_count()) + + self.assertEqual( + 3, + MultiStepPlan( + [ + CreatePartPlan(Mock()), + CreatePartPlan(Mock()), + CreatePartPlan(Mock()), + ] + ).new_partition_count(), + ) + + self.assertEqual( + 2, + MultiStepPlan( + [ + CreatePartPlan(Mock()), + CreatePartPlan(Mock()), + MountBootEfiPlan(Mock()), + NoOpBootPlan(), + ] + ).new_partition_count(), + ) diff --git a/subiquity/server/controllers/filesystem.py b/subiquity/server/controllers/filesystem.py index f03508e32..f79d08b2d 100644 --- a/subiquity/server/controllers/filesystem.py +++ b/subiquity/server/controllers/filesystem.py @@ -1096,36 +1096,10 @@ def get_classic_capabilities(self): classic_capabilities.update(info.capability_info.allowed) return sorted(classic_capabilities) - async def v2_guided_GET(self, wait: bool = False) -> GuidedStorageResponseV2: - """Acquire a list of possible guided storage configuration scenarios. - Results are sorted by the size of the space potentially available to - the install.""" - probe_resp = await self._probe_response(wait, GuidedStorageResponseV2) - if probe_resp is not None: - return probe_resp - - scenarios = [] - install_min = self.calculate_suggested_install_min() - - classic_capabilities = self.get_classic_capabilities() - - if GuidedCapability.DIRECT in classic_capabilities: - scenarios.append((0, GuidedStorageTargetManual())) - - for disk in self.potential_boot_disks(with_reformatting=True): - capability_info = CapabilityInfo() - for variation in self._variation_info.values(): - gap = gaps.largest_gap(disk._reformatted()) - capability_info.combine( - variation.capability_info_for_gap(gap, install_min) - ) - reformat = GuidedStorageTargetReformat( - disk_id=disk.id, - allowed=capability_info.allowed, - disallowed=capability_info.disallowed, - ) - scenarios.append((disk.size, reformat)) - + def available_use_gap_scenarios( + self, install_min + ) -> list[tuple[int, GuidedStorageTargetUseGap]]: + scenarios: list[tuple[int, GuidedStorageTargetUseGap]] = [] for disk in self.potential_boot_disks(with_reformatting=False): parts = [ p @@ -1141,6 +1115,24 @@ async def v2_guided_GET(self, wait: bool = False) -> GuidedStorageResponseV2: if gap is None: # Do we return a reason here? continue + + # Now we check if we have enough room for all the primary + # partitions. This isn't failproof but should limit the number of + # UseGaps scenarios that are suggested but can't be applied because + # we don't have enough room for partitions. + new_primary_parts = 0 + if not gap.in_extended: + new_primary_parts += 1 # For the rootfs to create. + new_primary_parts += boot.get_boot_device_plan(disk).new_partition_count() + # In theory, there could be a recovery partition as well. Not sure + # how to account for it since we don't know yet if one will be + # requested. + if new_primary_parts > gaps.remaining_primary_partitions( + disk, disk.alignment_data() + ): + log.error("skipping UseGap: not enough room for primary partitions") + continue + capability_info = CapabilityInfo() for variation in self._variation_info.values(): if variation.is_core_boot_classic(): @@ -1156,6 +1148,39 @@ async def v2_guided_GET(self, wait: bool = False) -> GuidedStorageResponseV2: disallowed=capability_info.disallowed, ) scenarios.append((gap.size, use_gap)) + return scenarios + + async def v2_guided_GET(self, wait: bool = False) -> GuidedStorageResponseV2: + """Acquire a list of possible guided storage configuration scenarios. + Results are sorted by the size of the space potentially available to + the install.""" + probe_resp = await self._probe_response(wait, GuidedStorageResponseV2) + if probe_resp is not None: + return probe_resp + + scenarios = [] + install_min = self.calculate_suggested_install_min() + + classic_capabilities = self.get_classic_capabilities() + + if GuidedCapability.DIRECT in classic_capabilities: + scenarios.append((0, GuidedStorageTargetManual())) + + for disk in self.potential_boot_disks(with_reformatting=True): + capability_info = CapabilityInfo() + for variation in self._variation_info.values(): + gap = gaps.largest_gap(disk._reformatted()) + capability_info.combine( + variation.capability_info_for_gap(gap, install_min) + ) + reformat = GuidedStorageTargetReformat( + disk_id=disk.id, + allowed=capability_info.allowed, + disallowed=capability_info.disallowed, + ) + scenarios.append((disk.size, reformat)) + + scenarios.extend(self.available_use_gap_scenarios(install_min)) for disk in self.potential_boot_disks(check_boot=False): part_align = disk.alignment_data().part_align diff --git a/subiquity/server/controllers/tests/test_filesystem.py b/subiquity/server/controllers/tests/test_filesystem.py index 70a751c15..859f33a08 100644 --- a/subiquity/server/controllers/tests/test_filesystem.py +++ b/subiquity/server/controllers/tests/test_filesystem.py @@ -24,7 +24,7 @@ from curtin.commands.extract import TrivialSourceHandler from jsonschema.validators import validator_for -from subiquity.common.filesystem import gaps, labels +from subiquity.common.filesystem import boot, gaps, labels from subiquity.common.filesystem.actions import DeviceAction from subiquity.common.types import ( AddPartitionV2, @@ -1299,17 +1299,65 @@ async def test_in_use_reformat_and_gap(self, bootloader, ptable): GuidedStorageTargetReformat( disk_id=self.disk.id, allowed=default_capabilities ), - GuidedStorageTargetUseGap( - disk_id=self.disk.id, - allowed=default_capabilities, - gap=labels.for_client(gaps.largest_gap(self.disk)), - ), GuidedStorageTargetManual(), ] + # VTOC does not have enough room for an ESP + 3 partitions, presumably. + if ptable != "vtoc" or bootloader == Bootloader.NONE: + expected.insert( + 1, + GuidedStorageTargetUseGap( + disk_id=self.disk.id, + allowed=default_capabilities, + gap=labels.for_client(gaps.largest_gap(self.disk)), + ), + ) resp = await self.fsc.v2_guided_GET() self.assertEqual(expected, resp.targets) self.assertEqual(ProbeStatus.DONE, resp.status) + @parameterized.expand( + ( + (1, 4, True, True), + (1, 4, False, True), + (4, 4, True, False), + (4, 4, False, False), + (3, 4, False, True), + (3, 4, True, False), + (127, 128, True, False), + (127, 128, False, True), + ) + ) + async def test_available_use_gap_scenarios( + self, + existing_primaries: int, + max_primaries: int, + create_boot_part: bool, + expected_scenario: bool, + ): + await self._setup(Bootloader.NONE, "gpt", fix_bios=True) + install_min = self.fsc.calculate_suggested_install_min() + + for _ in range(existing_primaries): + make_partition(self.model, self.disk, preserve=True, size=4 << 20) + + p_max_primaries = mock.patch.object( + self.fsc.model._partition_alignment_data["gpt"], + "primary_part_limit", + max_primaries, + ) + if create_boot_part: + boot_plan = boot.CreatePartPlan(mock.Mock(), mock.Mock(), mock.Mock()) + else: + boot_plan = boot.NoOpBootPlan() + p_boot_plan = mock.patch( + "subiquity.server.controllers.filesystem.boot.get_boot_device_plan", + return_value=boot_plan, + ) + with p_max_primaries, p_boot_plan: + scenarios = self.fsc.available_use_gap_scenarios(install_min) + + self.assertEqual(expected_scenario, scenarios != []) + class TestManualBoot(IsolatedAsyncioTestCase): def _setup(self, bootloader, ptable, **kw):