diff --git a/blivet/devicelibs/crypto.py b/blivet/devicelibs/crypto.py index 603259bf1..a880972a5 100644 --- a/blivet/devicelibs/crypto.py +++ b/blivet/devicelibs/crypto.py @@ -47,6 +47,9 @@ "luks2": BlockDev.CryptoLUKSVersion.LUKS2} DEFAULT_LUKS_VERSION = "luks2" +OPAL_TYPES = {"luks2-hw-opal": BlockDev.CryptoLUKSHWEncryptionType.OPAL_HW_AND_SW, + "luks2-hw-opal-only": BlockDev.CryptoLUKSHWEncryptionType.OPAL_HW_ONLY} + DEFAULT_INTEGRITY_ALGORITHM = "crc32c" # from linux/drivers/md/dm-integrity.c diff --git a/blivet/formats/luks.py b/blivet/formats/luks.py index ae96d9010..f6517605a 100644 --- a/blivet/formats/luks.py +++ b/blivet/formats/luks.py @@ -102,6 +102,8 @@ def __init__(self, **kwargs): :type luks_sector_size: int :keyword subsystem: LUKS subsystem :type subsystem: str + :keyword opal_passphrase: OPAL admin passphrase + :type opal_passphrase: str .. note:: @@ -125,15 +127,15 @@ def __init__(self, **kwargs): self.label = kwargs.get("label") or None self.subsystem = kwargs.get("subsystem") or None - self.is_opal = self.subsystem == "HW-OPAL" + self.is_opal = self.luks_version in crypto.OPAL_TYPES.keys() - if self.luks_version == "luks2": + if self.luks_version.startswith("luks2"): self._header_size = crypto.LUKS2_METADATA_SIZE else: self._header_size = crypto.LUKS1_METADATA_SIZE self._min_size = self._header_size - if not self.exists and self.luks_version not in crypto.LUKS_VERSIONS.keys(): + if not self.exists and self.luks_version not in list(crypto.LUKS_VERSIONS.keys()) + list(crypto.OPAL_TYPES.keys()): raise ValueError("Unknown or unsupported LUKS version '%s'" % self.luks_version) if not self.exists: @@ -183,6 +185,8 @@ def __init__(self, **kwargs): if self.luks_sector_size and self.luks_version != "luks2": raise ValueError("Sector size argument is valid only for LUKS version 2.") + self.__opal_passphrase = kwargs.get("opal_passphrase") + def __repr__(self): s = DeviceFormat.__repr__(self) if self.__passphrase: @@ -225,6 +229,12 @@ def _set_passphrase(self, passphrase): passphrase = property(fset=_set_passphrase) + def _set_opal_passphrase(self, opal_passphrase): + """ Set the OPAL admin passphrase for this device. """ + self.__opal_passphrase = opal_passphrase + + opal_passphrase = property(fset=_set_opal_passphrase) + @property def has_key(self): return bool((self.__passphrase not in ["", None]) or @@ -343,7 +353,7 @@ def _create(self, **kwargs): type=self.type, status=self.status) super(LUKS, self)._create(**kwargs) # set up the event sync - if not self.pbkdf_args and self.luks_version == "luks2": + if not self.pbkdf_args and self.luks_version.startswith("luks2"): if luks_data.pbkdf_args: self.pbkdf_args = luks_data.pbkdf_args else: @@ -355,7 +365,7 @@ def _create(self, **kwargs): luks_data.pbkdf_args = self.pbkdf_args log.info("PBKDF arguments for LUKS2 not specified, using defaults with memory limit %s", mem_limit) - if not self.luks_sector_size and self.luks_version == "luks2": + if not self.luks_sector_size and self.luks_version.startswith("luks2"): self.luks_sector_size = crypto.get_optimal_luks_sector_size(self.device) if self.pbkdf_args: @@ -379,14 +389,29 @@ def _create(self, **kwargs): else: raise LUKSError("Passphrase or key file must be set for LUKS create") + if self.is_opal: + if not self.__opal_passphrase: + raise LUKSError("OPAL admin passphrase must be specified when creating LUKS HW-OPAL format") + opal_context = blockdev.CryptoKeyslotContext(passphrase=self.__opal_passphrase) + try: - blockdev.crypto.luks_format(self.device, - context=context, - cipher=self.cipher, - key_size=self.key_size, - min_entropy=self.min_luks_entropy, - luks_version=crypto.LUKS_VERSIONS[self.luks_version], - extra=extra) + if self.is_opal: + blockdev.crypto.opal_format(self.device, + context=context, + cipher=self.cipher, + key_size=self.key_size, + min_entropy=self.min_luks_entropy, + opal_context=opal_context, + hw_encryption=crypto.OPAL_TYPES[self.luks_version], + extra=extra) + else: + blockdev.crypto.luks_format(self.device, + context=context, + cipher=self.cipher, + key_size=self.key_size, + min_entropy=self.min_luks_entropy, + luks_version=crypto.LUKS_VERSIONS[self.luks_version], + extra=extra) except blockdev.CryptoError as e: raise LUKSError(e) diff --git a/blivet/populator/helpers/luks.py b/blivet/populator/helpers/luks.py index bb97757cc..4de15ec57 100644 --- a/blivet/populator/helpers/luks.py +++ b/blivet/populator/helpers/luks.py @@ -134,7 +134,10 @@ def _get_kwargs(self): except blockdev.CryptoError as e: log.warning("Failed to get information about LUKS format on %s: %s", self.device, str(e)) else: - kwargs["subsystem"] = info.subsystem + if info.hw_encryption == blockdev.CryptoLUKSHWEncryptionType.OPAL_HW_AND_SW: + kwargs["luks_version"] = "luks2-hw-opal" + elif info.hw_encryption == blockdev.CryptoLUKSHWEncryptionType.OPAL_HW_ONLY: + kwargs["luks_version"] = "luks2-hw-opal-only" return kwargs diff --git a/tests/unit_tests/formats_tests/luks_test.py b/tests/unit_tests/formats_tests/luks_test.py index 771c831fa..d70cc3f8f 100644 --- a/tests/unit_tests/formats_tests/luks_test.py +++ b/tests/unit_tests/formats_tests/luks_test.py @@ -134,6 +134,10 @@ def test_luks_opal(self): self.assertFalse(fmt.is_opal) self.assertFalse(fmt.protected) - fmt = LUKS(subsystem="HW-OPAL") + fmt = LUKS(luks_version="luks2-hw-opal") + self.assertTrue(fmt.is_opal) + self.assertTrue(fmt.protected) + + fmt = LUKS(luks_version="luks2-hw-opal-only") self.assertTrue(fmt.is_opal) self.assertTrue(fmt.protected)