Skip to content

Commit

Permalink
Merge pull request #1312 from vojtechtrefny/main_parted-flags-configure
Browse files Browse the repository at this point in the history
Allow setting parted partition flags using ActionConfigureDevice
  • Loading branch information
vojtechtrefny authored Nov 5, 2024
2 parents 50a96d5 + 111711a commit 02d26a8
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 1 deletion.
46 changes: 46 additions & 0 deletions blivet/devices/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class PartitionDevice(StorageDevice):
_resizable = True
default_size = DEFAULT_PART_SIZE

config_actions_map = {"parted_flags": "_set_parted_flags"}

def __init__(self, name, fmt=None, uuid=None,
size=None, grow=False, maxsize=None, start=None, end=None,
major=None, minor=None, bootable=None,
Expand Down Expand Up @@ -163,6 +165,8 @@ def __init__(self, name, fmt=None, uuid=None,
self._part_type_name = None
self._mountpoint = mountpoint

self._parted_flags = []

if not exists and size is None:
if start is not None and end is not None:
size = Size(0)
Expand Down Expand Up @@ -609,6 +613,48 @@ def unset_flag(self, flag):

self.parted_partition.unsetFlag(flag)

@property
def parted_flags(self):
""" Parted flags (as list of strings) currently set for this partition """
if self._parted_flags:
return self._parted_flags

parted_flags = []

if not self.parted_partition:
return parted_flags

for fid, flag in parted.partitionFlag.items():
if self.get_flag(fid):
parted_flags.append(flag)

self._parted_flags = parted_flags
return self._parted_flags

@parted_flags.setter
def parted_flags(self, parted_flags):
self._parted_flags = parted_flags

def _set_parted_flags(self, new_parted_flags, old_parted_flags, dry_run):
if not self.exists:
raise ValueError("device has not been created")

if not self.parted_partition:
raise ValueError("cannot set flags %s" % self.name)

if sorted(new_parted_flags) == sorted(old_parted_flags):
raise ValueError("flags are already set to %s" % ", ".join(new_parted_flags))

if not dry_run:
for fid in parted.partitionFlag.keys():
self.unset_flag(fid)

for fid, flag in parted.partitionFlag.items():
if flag in new_parted_flags:
self.set_flag(fid)

self.disk.original_format.commit_to_disk()

@property
def is_magic(self):
if not self.disk or not self.disklabel_supported:
Expand Down
85 changes: 85 additions & 0 deletions tests/storage_tests/devices_test/partition_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from unittest.mock import patch

import blivet
from blivet.devices import DiskFile
from blivet.devices import PartitionDevice
from blivet.devicelibs.gpt import gpt_part_uuid_for_mountpoint
Expand All @@ -13,6 +14,8 @@
from blivet.size import Size
from blivet.util import sparsetmpfile

from ..storagetestcase import StorageTestCase


class PartitionDeviceTestCase(unittest.TestCase):

Expand Down Expand Up @@ -266,3 +269,85 @@ def test_dev_part_type_gpt_autodiscover(self):
flags.gpt_discoverable_partitions = True
self.assertEqual(device.part_type_uuid,
gpt_part_uuid_for_mountpoint("/home"))


class PartitionTestCase(StorageTestCase):

_num_disks = 1

def setUp(self):
super().setUp()

disks = [os.path.basename(vdev) for vdev in self.vdevs]
self.storage = blivet.Blivet()
self.storage.exclusive_disks = disks
self.storage.reset()

# make sure only the targetcli disks are in the devicetree
for disk in self.storage.disks:
self.assertTrue(disk.path in self.vdevs)
self.assertIsNone(disk.format.type)
self.assertFalse(disk.children)

def _clean_up(self):
self.storage.reset()
for disk in self.storage.disks:
if disk.path not in self.vdevs:
raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name)
self.storage.recursive_remove(disk)

self.storage.do_it()

return super()._clean_up()

def test_parted_flags_configure_action(self):
disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
self.assertIsNotNone(disk)

self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos"))

part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
self.storage.create_device(part)

blivet.partitioning.do_partitioning(self.storage)

self.storage.do_it()
self.storage.reset()

part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
self.assertIsNotNone(part)

self.assertCountEqual(part.parted_flags, [])

ac = blivet.deviceaction.ActionConfigureDevice(device=part, attr="parted_flags",
new_value=["boot"])
self.storage.devicetree.actions.add(ac)

self.assertCountEqual(part.parted_flags, ["boot"])

self.storage.do_it()
self.storage.reset()

part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
self.assertIsNotNone(part)

self.assertCountEqual(part.parted_flags, ["boot"])

with self.assertRaises(ValueError):
blivet.deviceaction.ActionConfigureDevice(device=part, attr="parted_flags",
new_value=["boot"])

ac = blivet.deviceaction.ActionConfigureDevice(device=part, attr="parted_flags",
new_value=["boot", "lvm"])

self.storage.devicetree.actions.add(ac)

self.assertCountEqual(part.parted_flags, ["boot", "lvm"])

self.storage.do_it()
self.storage.reset()

part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
self.assertIsNotNone(part)

self.assertCountEqual(part.parted_flags, ["boot", "lvm"])
3 changes: 2 additions & 1 deletion tests/storage_tests/storagetestcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,15 @@ def delete_lio_device(dev_path):
class StorageTestCase(unittest.TestCase):

_disk_size = 2 * 1024**3
_num_disks = 4

def setUp(self):
self.addCleanup(self._clean_up)

self.vdevs = []
self._dev_files = []

for _ in range(4):
for _ in range(self._num_disks):
dev_file = create_sparse_tempfile("blivet_test", self._disk_size)
self._dev_files.append(dev_file)
try:
Expand Down
1 change: 1 addition & 0 deletions tests/unit_tests/action_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def test_action_obsoletes(self):
# - obsoletes all but ActionConfigureDevice actions w/ lower id on the
# same existing device with the same attribute being configured
sdc1._rename = Mock() # XXX partitions are actually not renamable
sdc1.config_actions_map = {"name": "_rename"}
configure_device_1 = ActionConfigureDevice(sdc1, "name", "new_name")
configure_device_1.apply()
configure_device_2 = ActionConfigureDevice(sdc1, "name", "new_name2")
Expand Down

0 comments on commit 02d26a8

Please sign in to comment.