Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Windows Secure Boot key upgrade tests #277

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "contrib/secureboot_objects"]
path = contrib/secureboot_objects
url = https://github.com/microsoft/secureboot_objects.git
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def imported_vm(host, vm_ref):
else:
vm_orig = host.import_vm(vm_ref, host.main_sr_uuid(), use_cache=CACHE_IMPORTED_VM)

if CACHE_IMPORTED_VM:
if CACHE_IMPORTED_VM or is_uuid(vm_ref):
# Clone the VM before running tests, so that the original VM remains untouched
logging.info(">> Clone cached VM before running tests")
vm = vm_orig.clone()
Expand All @@ -372,7 +372,7 @@ def imported_vm(host, vm_ref):

yield vm
# teardown
if not is_uuid(vm_ref):
if CACHE_IMPORTED_VM or is_uuid(vm_ref):
logging.info("<< Destroy VM")
vm.destroy(verify=True)

Expand Down
1 change: 1 addition & 0 deletions contrib/secureboot_objects
Submodule secureboot_objects added at 058c7e
244 changes: 169 additions & 75 deletions lib/efi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import hashlib
import logging
import os
from pathlib import Path
import shutil
import struct
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
from tempfile import TemporaryDirectory, mkstemp
from typing import Iterable, Literal, Optional, Union
from uuid import UUID

from cryptography import x509
Expand All @@ -20,6 +22,59 @@
import lib.commands as commands


class _EfiGlobalTempdir:
_instance = None

def _safe_cleanup(self):
if self._instance is not None:
try:
self._instance.cleanup()
except OSError:
pass

def get(self):
if self._instance is None:
self._instance = TemporaryDirectory()
atexit.register(self._safe_cleanup)
return self._instance

def getfile(self, suffix=None, prefix=None):
fd, path = mkstemp(suffix=suffix, prefix=prefix, dir=self.get().name)
os.close(fd)
return path


_tempdir = _EfiGlobalTempdir()


class _SecureBootCertList:
_prefix = Path(__file__).parent / '../contrib/secureboot_objects/PreSignedObjects'

def kek_ms_2011(self):
return str(self._prefix / "KEK/Certificates/MicCorKEKCA2011_2011-06-24.der")

def kek_ms_2023(self):
return str(self._prefix / "KEK/Certificates/microsoft corporation kek 2k ca 2023.der")

def db_win_2011(self):
return str(self._prefix / "DB/Certificates/MicWinProPCA2011_2011-10-19.der")

def db_uefi_2011(self):
return str(self._prefix / "DB/Certificates/MicCorUEFCA2011_2011-06-27.der")

def db_win_2023(self):
return str(self._prefix / "DB/Certificates/windows uefi ca 2023.der")

def db_uefi_2023(self):
return str(self._prefix / "DB/Certificates/microsoft uefi ca 2023.der")

def db_oprom_2023(self):
return str(self._prefix / "DB/Certificates/microsoft option rom uefi ca 2023.der")


ms_certs = _SecureBootCertList()


class GUID(UUID):
def as_bytes(self):
return self.bytes_le
Expand Down Expand Up @@ -128,8 +183,11 @@ def get_secure_boot_guid(variable: str) -> GUID:
def cert_to_efi_sig_list(cert):
"""Return an ESL from a PEM cert."""
with open(cert, 'rb') as f:
pem = f.read()
cert = x509.load_pem_x509_certificate(pem)
cert_raw = f.read()
try:
cert = x509.load_pem_x509_certificate(cert_raw)
except ValueError:
cert = x509.load_der_x509_certificate(cert_raw)
der = cert.public_bytes(Encoding.DER)

signature_type = EFI_CERT_X509_GUID
Expand Down Expand Up @@ -164,7 +222,13 @@ def certs_to_sig_db(certs):
return db


def sign_efi_sig_db(sig_db, var, key, cert, time=None, guid=None):
def sign_efi_sig_db(
sig_db: bytes,
var: str,
key: str,
cert: str,
time: Optional[datetime] = None,
guid: Optional[GUID] = None):
"""Return a pkcs7 SignedData from a UEFI signature database."""
global p7_out

Expand Down Expand Up @@ -214,7 +278,7 @@ def sign_efi_sig_db(sig_db, var, key, cert, time=None, guid=None):
return create_auth2_header(p7, timestamp) + p7 + sig_db


def sign(payload, key_file, cert_file):
def sign(payload: bytes, key_file: str, cert_file: str):
"""Returns a signed PKCS7 of payload signed by key and cert."""
with open(key_file, 'rb') as f:
priv_key = serialization.load_pem_private_key(f.read(), password=None)
Expand All @@ -236,7 +300,7 @@ def sign(payload, key_file, cert_file):
)


def create_auth2_header(sig_db, timestamp):
def create_auth2_header(sig_db: bytes, timestamp: bytes):
"""Return an EFI_AUTHENTICATE_VARIABLE_2 from a signature database."""
length = len(sig_db) + WIN_CERTIFICATE_UEFI_GUID_offset
revision = 0x200
Expand Down Expand Up @@ -292,37 +356,90 @@ def pesign(key, cert, name, image):
return signed


class Certificate:
def __init__(self, pub: str, key: Optional[str]):
self.pub = pub
self.key = key

@classmethod
def self_signed(cls, common_name='XCP-ng Test Common Name'):
pub = _tempdir.getfile(suffix='.pem')
key = _tempdir.getfile(suffix='.pem')

commands.local_cmd([
'openssl', 'req', '-new', '-x509', '-newkey', 'rsa:2048',
'-subj', '/CN=%s/' % common_name, '-nodes', '-keyout',
key, '-sha256', '-days', '3650', '-out', pub
])

return cls(pub, key)

def sign_efi_sig_db(self, var: str, data: bytes, guid: Optional[GUID]):
assert self.key is not None
return sign_efi_sig_db(
data, var, self.key, self.pub, time=timestamp(), guid=guid
)

def copy(self):
newpub = _tempdir.getfile(suffix='.pem')
shutil.copyfile(self.pub, newpub)

newkey = None
if self.key is not None:
newkey = _tempdir.getfile(suffix='.pem')
shutil.copyfile(self.key, newkey)

return Certificate(newpub, newkey)


class EFIAuth:
def __init__(self, name, is_null=False):
if name not in SECURE_BOOT_VARIABLES:
raise RuntimeError(f"{name} is not a secure boot variable")
def __init__(
self,
name: Literal["PK", "KEK", "db", "dbx"],
owner_cert: Optional[Certificate] = None,
other_certs: Iterable[Union[Certificate, str]] = None):
assert name in SECURE_BOOT_VARIABLES
assert owner_cert is None or owner_cert.key is not None, "owner cert must have private key"
self.name = name
self.is_null = is_null
self.guid = get_secure_boot_guid(self.name)
self.key = ''
self.cert = Certificate()
self.tempdir = TemporaryDirectory(prefix=name + '_')
atexit.register(self.tempdir.cleanup)
self.efi_signature_list = self._get_efi_signature_list()
self.auth_data = None
self.auth = os.path.join(self.tempdir.name, '%s.auth' % self.name)
self._owner_cert = owner_cert
self._other_certs = list(other_certs or [])
self._efi_signature_list = self._get_efi_signature_list()
self._auth_data = None
self._auth = _tempdir.getfile(suffix='.auth')

@classmethod
def self_signed(
cls,
name: Literal["PK", "KEK", "db", "dbx"],
other_certs: Iterable[Union[Certificate, str]] = None):
return cls(name, owner_cert=Certificate.self_signed(name + " Owner"), other_certs=other_certs)

def is_signed(self):
return os.path.exists(self.auth)
return self._auth_data is not None

def sign_auth(self, other: 'EFIAuth'):
def auth_data(self):
assert self.is_signed()
return self._auth_data

def auth(self):
assert self.is_signed()
return self._auth

def sign_auth(self, to_be_signed: 'EFIAuth'):
"""
Sign another EFIAuth object.

The other EFIAuth's member `auth` will be set to
the path of the .auth file.
"""
other.auth_data = self.cert.sign_data(
other.name, other.efi_signature_list, other.guid
assert self._owner_cert is not None
to_be_signed._auth_data = self._owner_cert.sign_efi_sig_db(
to_be_signed.name, to_be_signed._efi_signature_list, to_be_signed.guid
)

with open(other.auth, 'wb') as f:
f.write(other.auth_data)
with open(to_be_signed._auth, 'wb') as f:
f.write(to_be_signed._auth_data)

def sign_image(self, image: str) -> str:
"""
Expand All @@ -335,19 +452,19 @@ def sign_image(self, image: str) -> str:

Returns path to signed image.
"""
assert self._owner_cert is not None
if shutil.which('sbsign'):
signed = get_signed_name(image)
commands.local_cmd([
'sbsign', '--key', self.cert.key, '--cert', self.cert.pub,
'sbsign', '--key', self._owner_cert.key, '--cert', self._owner_cert.pub,
image, '--output', signed
])
else:
signed = pesign(self.cert.key, self.cert.pub, self.name, image)
signed = pesign(self._owner_cert.key, self._owner_cert.pub, self.name, image)

return signed

@classmethod
def copy(cls, other, name=None):
def copy(self, name: Optional[Literal["PK", "KEK", "db", "dbx"]] = None):
"""
Make a copy of an existing EFIAuth object.

Expand All @@ -368,56 +485,33 @@ def copy(cls, other, name=None):
This is ONLY useful for creating a new handle.
"""
if name is None:
name = other.name
name = self.name

obj = cls(name=name, is_null=other.is_null)
obj.cert = other.cert.copy()
obj.efi_signature_list = other.efi_signature_list
copied = EFIAuth(
name=name,
owner_cert=self._owner_cert.copy(),
other_certs=self._other_certs.copy())
copied._efi_signature_list = self._efi_signature_list

if other.is_signed():
obj.auth_data = copy.copy(other.auth_data)
shutil.copyfile(other.auth, obj.auth)
if self.is_signed():
copied._auth_data = copy.copy(self._auth_data)
shutil.copyfile(self._auth, copied._auth)

return obj
return copied

def _get_efi_signature_list(self) -> str:
if self.is_null:
return b''
def _get_efi_signature_list(self) -> bytes:
certs = []
if self._owner_cert is not None:
certs.append(self._owner_cert.pub)
for other_cert in self._other_certs:
if isinstance(other_cert, str):
certs.append(other_cert)
elif isinstance(other_cert, Certificate):
certs.append(other_cert.pub)
else:
raise TypeError('other_cert is not Certificate or str')

return certs_to_sig_db(self.cert.pub)


class Certificate:
def __init__(self, common_name='XCP-ng Test Common Name', init_keys=True):
self.common_name = common_name
self.name = common_name.replace(' ', '_').lower()
self.tempdir = TemporaryDirectory(prefix='cert_' + self.name)
atexit.register(self.tempdir.cleanup)
self.key = os.path.join(self.tempdir.name, '%s.key' % self.name)
self.pub = os.path.join(self.tempdir.name, 'tmp.crt')

if init_keys:
commands.local_cmd([
'openssl', 'req', '-new', '-x509', '-newkey', 'rsa:2048',
'-subj', '/CN=%s/' % self.common_name, '-nodes', '-keyout',
self.key, '-sha256', '-days', '3650', '-out', self.pub
])

def sign_data(self, var, data, guid):
return sign_efi_sig_db(
data, var, self.key, self.pub, time=timestamp(), guid=guid
)

def _get_cert_path(self):
return os.path.join(
self.tempdir.name, '_'.join(self.common_name.split()) + '.crt'
)

def copy(self):
obj = Certificate(common_name=self.common_name, init_keys=False)
shutil.copyfile(self.key, obj.key)
shutil.copyfile(self.pub, obj.pub)
return obj
return certs_to_sig_db(certs)


def esl_from_auth_file(auth: str) -> bytes:
Expand All @@ -434,16 +528,16 @@ def esl_from_auth_file(auth: str) -> bytes:
return esl_from_auth_bytes(data)


def esl_from_auth_bytes(auth: bytes) -> bytes:
def esl_from_auth_bytes(auth_data: bytes) -> bytes:
"""
Return the ESL contained inside the AUTH2 structure.

Warning: This will break if used on any ESL containing certs of non-X509 GUID type.
All of the certs used in Secure Boot are X509 GUID type.
"""
return auth[auth.index(EFI_CERT_X509_GUID):]
return auth_data[auth_data.index(EFI_CERT_X509_GUID):]

def get_md5sum_from_auth(auth):
def get_md5sum_from_auth(auth: str):
return hashlib.md5(esl_from_auth_file(auth)).hexdigest()

if __name__ == '__main__':
Expand Down
Loading
Loading