diff --git a/blivet/devices/partition.py b/blivet/devices/partition.py index 4a9c96aeb..e1693e951 100644 --- a/blivet/devices/partition.py +++ b/blivet/devices/partition.py @@ -69,6 +69,8 @@ class PartitionDevice(StorageDevice): _resizable = True default_size = DEFAULT_PART_SIZE + config_actions_map = {"parted_flags": "_set_parted_flags"} + def __init__(self, name, fmt=None, uuid=None, size=None, grow=False, maxsize=None, start=None, end=None, major=None, minor=None, bootable=None, @@ -163,6 +165,8 @@ def __init__(self, name, fmt=None, uuid=None, self._part_type_name = None self._mountpoint = mountpoint + self._parted_flags = [] + if not exists and size is None: if start is not None and end is not None: size = Size(0) @@ -609,6 +613,48 @@ def unset_flag(self, flag): self.parted_partition.unsetFlag(flag) + @property + def parted_flags(self): + """ Parted flags (as list of strings) currently set for this partition """ + if self._parted_flags: + return self._parted_flags + + parted_flags = [] + + if not self.parted_partition: + return parted_flags + + for fid, flag in parted.partitionFlag.items(): + if self.get_flag(fid): + parted_flags.append(flag) + + self._parted_flags = parted_flags + return self._parted_flags + + @parted_flags.setter + def parted_flags(self, parted_flags): + self._parted_flags = parted_flags + + def _set_parted_flags(self, new_parted_flags, old_parted_flags, dry_run): + if not self.exists: + raise ValueError("device has not been created") + + if not self.parted_partition: + raise ValueError("cannot set flags %s" % self.name) + + if sorted(new_parted_flags) == sorted(old_parted_flags): + raise ValueError("flags are already set to %s" % ", ".join(new_parted_flags)) + + if not dry_run: + for fid in parted.partitionFlag.keys(): + self.unset_flag(fid) + + for fid, flag in parted.partitionFlag.items(): + if flag in new_parted_flags: + self.set_flag(fid) + + self.disk.original_format.commit_to_disk() + @property def is_magic(self): if not self.disk or not self.disklabel_supported: diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py index 73da87b43..8ac8ad55b 100644 --- a/tests/storage_tests/devices_test/partition_test.py +++ b/tests/storage_tests/devices_test/partition_test.py @@ -5,6 +5,7 @@ from unittest.mock import patch +import blivet from blivet.devices import DiskFile from blivet.devices import PartitionDevice from blivet.devicelibs.gpt import gpt_part_uuid_for_mountpoint @@ -13,6 +14,8 @@ from blivet.size import Size from blivet.util import sparsetmpfile +from ..storagetestcase import StorageTestCase + class PartitionDeviceTestCase(unittest.TestCase): @@ -266,3 +269,85 @@ def test_dev_part_type_gpt_autodiscover(self): flags.gpt_discoverable_partitions = True self.assertEqual(device.part_type_uuid, gpt_part_uuid_for_mountpoint("/home")) + + +class PartitionTestCase(StorageTestCase): + + _num_disks = 1 + + def setUp(self): + super().setUp() + + disks = [os.path.basename(vdev) for vdev in self.vdevs] + self.storage = blivet.Blivet() + self.storage.exclusive_disks = disks + self.storage.reset() + + # make sure only the targetcli disks are in the devicetree + for disk in self.storage.disks: + self.assertTrue(disk.path in self.vdevs) + self.assertIsNone(disk.format.type) + self.assertFalse(disk.children) + + def _clean_up(self): + self.storage.reset() + for disk in self.storage.disks: + if disk.path not in self.vdevs: + raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name) + self.storage.recursive_remove(disk) + + self.storage.do_it() + + return super()._clean_up() + + def test_parted_flags_configure_action(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos")) + + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + self.assertIsNotNone(part) + + self.assertCountEqual(part.parted_flags, []) + + ac = blivet.deviceaction.ActionConfigureDevice(device=part, attr="parted_flags", + new_value=["boot"]) + self.storage.devicetree.actions.add(ac) + + self.assertCountEqual(part.parted_flags, ["boot"]) + + self.storage.do_it() + self.storage.reset() + + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + self.assertIsNotNone(part) + + self.assertCountEqual(part.parted_flags, ["boot"]) + + with self.assertRaises(ValueError): + blivet.deviceaction.ActionConfigureDevice(device=part, attr="parted_flags", + new_value=["boot"]) + + ac = blivet.deviceaction.ActionConfigureDevice(device=part, attr="parted_flags", + new_value=["boot", "lvm"]) + + self.storage.devicetree.actions.add(ac) + + self.assertCountEqual(part.parted_flags, ["boot", "lvm"]) + + self.storage.do_it() + self.storage.reset() + + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + self.assertIsNotNone(part) + + self.assertCountEqual(part.parted_flags, ["boot", "lvm"]) diff --git a/tests/storage_tests/storagetestcase.py b/tests/storage_tests/storagetestcase.py index 2d5cb21fe..0ffec3716 100644 --- a/tests/storage_tests/storagetestcase.py +++ b/tests/storage_tests/storagetestcase.py @@ -176,6 +176,7 @@ def delete_lio_device(dev_path): class StorageTestCase(unittest.TestCase): _disk_size = 2 * 1024**3 + _num_disks = 4 def setUp(self): self.addCleanup(self._clean_up) @@ -183,7 +184,7 @@ def setUp(self): self.vdevs = [] self._dev_files = [] - for _ in range(4): + for _ in range(self._num_disks): dev_file = create_sparse_tempfile("blivet_test", self._disk_size) self._dev_files.append(dev_file) try: diff --git a/tests/unit_tests/action_test.py b/tests/unit_tests/action_test.py index 8aefad2f0..e252ec70e 100644 --- a/tests/unit_tests/action_test.py +++ b/tests/unit_tests/action_test.py @@ -424,6 +424,7 @@ def test_action_obsoletes(self): # - obsoletes all but ActionConfigureDevice actions w/ lower id on the # same existing device with the same attribute being configured sdc1._rename = Mock() # XXX partitions are actually not renamable + sdc1.config_actions_map = {"name": "_rename"} configure_device_1 = ActionConfigureDevice(sdc1, "name", "new_name") configure_device_1.apply() configure_device_2 = ActionConfigureDevice(sdc1, "name", "new_name2")