diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 78777f461..0f9f75116 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,6 +12,10 @@ updates: interval: daily open-pull-requests-limit: 99 rebase-strategy: "disabled" + groups: + actions: + patterns: + - "*" - package-ecosystem: github-actions directory: .github/actions/upload-coverage/ @@ -19,3 +23,7 @@ updates: interval: daily open-pull-requests-limit: 99 rebase-strategy: "disabled" + groups: + actions: + patterns: + - "*" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7208be995..168c0b617 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,7 +91,7 @@ jobs: - run: pip install coverage[toml] - name: download coverage data - uses: actions/download-artifact@7a1cd3216ca9260cd8022db641d960b1db4d1be4 # v4.0.0 + uses: actions/download-artifact@f44cd7b40bfd40b6aa1cc1b9b5b7bf03d3c67110 # v4.1.0 with: path: all-artifacts/ diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index cce578b98..3342333a7 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -22,7 +22,7 @@ jobs: - name: install sigstore-python run: python -m pip install . - - uses: sigstore/sigstore-conformance@c8d17eb7ee884cf86b93a3a3f471648fb0a83819 # v0.0.9 + - uses: sigstore/sigstore-conformance@7375951316d6b28d07f7406c01e1dc7de2a75ce7 # v0.0.10 with: entrypoint: ${{ github.workspace }}/test/integration/sigstore-python-conformance - xfail: "test_verify_with_trust_root" # see issue 821 + xfail: "test_verify_with_trust_root test_verify_dsse_bundle_with_trust_root" # see issue 821 diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 964c2b3c3..33690672a 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -28,7 +28,7 @@ jobs: make doc - name: upload docs artifact - uses: actions/upload-pages-artifact@a753861a5debcf57bf8b404356158c8e1e33150c # v2.0.0 + uses: actions/upload-pages-artifact@0252fc4ba7626f0298f0cf00902a25c6afc77fa8 # v3.0.0 with: path: ./html/ @@ -49,4 +49,4 @@ jobs: url: ${{ steps.deployment.outputs.page_url }} steps: - id: deployment - uses: actions/deploy-pages@13b55b33dd8996121833dbc1db458c793a334630 # v3.0.1 + uses: actions/deploy-pages@7a9bd943aa5e5175aeb8502edcc6c1c02d398e10 # v4.0.2 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a0166eee7..1f20dd34e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -119,7 +119,7 @@ jobs: id-token: write steps: - name: Download artifacts directories # goes to current working directory - uses: actions/download-artifact@7a1cd3216ca9260cd8022db641d960b1db4d1be4 # v4.0.0 + uses: actions/download-artifact@f44cd7b40bfd40b6aa1cc1b9b5b7bf03d3c67110 # v4.1.0 - name: publish uses: pypa/gh-action-pypi-publish@2f6f737ca5f74c637829c0f5c3acd0e29ea5e8bf # v1.8.11 @@ -134,7 +134,7 @@ jobs: contents: write steps: - name: Download artifacts directories # goes to current working directory - uses: actions/download-artifact@7a1cd3216ca9260cd8022db641d960b1db4d1be4 # v4.0.0 + uses: actions/download-artifact@f44cd7b40bfd40b6aa1cc1b9b5b7bf03d3c67110 # v4.1.0 - name: Upload artifacts to github # Confusingly, this action also supports updating releases, not diff --git a/pyproject.toml b/pyproject.toml index 2fd4f74ef..30757fda9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,7 @@ lint = [ "mypy ~= 1.1", # NOTE(ww): ruff is under active development, so we pin conservatively here # and let Dependabot periodically perform this update. - "ruff < 0.1.9", + "ruff < 0.1.11", "types-requests", "types-protobuf", "types-pyOpenSSL", diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 2d88e1e3b..0b08c09e6 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -40,7 +40,7 @@ RekorClient, RekorKeyring, ) -from sigstore._internal.tuf import TrustUpdater +from sigstore._internal.trustroot import TrustedRoot from sigstore._utils import PEMCert from sigstore.errors import Error from sigstore.oidc import ( @@ -650,16 +650,16 @@ def _sign(args: argparse.Namespace) -> None: elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL: signing_ctx = SigningContext.production() else: - # Assume "production" keys if none are given as arguments - updater = TrustUpdater.production() + # Assume "production" trust root if no keys are given as arguments + trusted_root = TrustedRoot.production() if args.ctfe_pem is not None: ctfe_keys = [args.ctfe_pem.read()] else: - ctfe_keys = updater.get_ctfe_keys() + ctfe_keys = trusted_root.get_ctfe_keys() if args.rekor_root_pubkey is not None: rekor_keys = [args.rekor_root_pubkey.read()] else: - rekor_keys = updater.get_rekor_keys() + rekor_keys = trusted_root.get_rekor_keys() ct_keyring = CTKeyring(Keyring(ctfe_keys)) rekor_keyring = RekorKeyring(Keyring(rekor_keys)) @@ -828,8 +828,8 @@ def _collect_verification_state( if args.rekor_root_pubkey is not None: rekor_keys = [args.rekor_root_pubkey.read()] else: - updater = TrustUpdater.production() - rekor_keys = updater.get_rekor_keys() + trusted_root = TrustedRoot.production() + rekor_keys = trusted_root.get_rekor_keys() verifier = Verifier( rekor=RekorClient( diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 90706800d..20c9a62d5 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -30,7 +30,7 @@ from sigstore._internal.ctfe import CTKeyring from sigstore._internal.keyring import Keyring -from sigstore._internal.tuf import TrustUpdater +from sigstore._internal.trustroot import TrustedRoot from sigstore.transparency import LogEntry logger = logging.getLogger(__name__) @@ -249,14 +249,14 @@ def __del__(self) -> None: self.session.close() @classmethod - def production(cls, updater: TrustUpdater) -> RekorClient: + def production(cls, trust_root: TrustedRoot) -> RekorClient: """ Returns a `RekorClient` populated with the default Rekor production instance. - updater must be a `TrustUpdater` for the production TUF repository. + trust_root must be a `TrustedRoot` for the production TUF repository. """ - rekor_keys = updater.get_rekor_keys() - ctfe_keys = updater.get_ctfe_keys() + rekor_keys = trust_root.get_rekor_keys() + ctfe_keys = trust_root.get_ctfe_keys() return cls( DEFAULT_REKOR_URL, @@ -265,14 +265,14 @@ def production(cls, updater: TrustUpdater) -> RekorClient: ) @classmethod - def staging(cls, updater: TrustUpdater) -> RekorClient: + def staging(cls, trust_root: TrustedRoot) -> RekorClient: """ Returns a `RekorClient` populated with the default Rekor staging instance. - updater must be a `TrustUpdater` for the staging TUF repository. + trust_root must be a `TrustedRoot` for the staging TUF repository. """ - rekor_keys = updater.get_rekor_keys() - ctfe_keys = updater.get_ctfe_keys() + rekor_keys = trust_root.get_rekor_keys() + ctfe_keys = trust_root.get_ctfe_keys() return cls( STAGING_REKOR_URL, diff --git a/sigstore/_internal/trustroot.py b/sigstore/_internal/trustroot.py new file mode 100644 index 000000000..c317e8b3a --- /dev/null +++ b/sigstore/_internal/trustroot.py @@ -0,0 +1,150 @@ +# Copyright 2023 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Trust root management for sigstore-python. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable + +from cryptography.x509 import Certificate, load_der_x509_certificate +from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange +from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( + CertificateAuthority, + TransparencyLogInstance, +) +from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( + TrustedRoot as _TrustedRoot, +) + +from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater +from sigstore.errors import MetadataError + + +def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: + """ + Given a `period`, checks that the the current time is not before `start`. If + `allow_expired` is `False`, also checks that the current time is not after + `end`. + """ + now = datetime.now(timezone.utc) + + # If there was no validity period specified, the key is always valid. + if not period: + return True + + # Active: if the current time is before the starting period, we are not yet + # valid. + if now < period.start: + return False + + # If we want Expired keys, the key is valid at this point. Otherwise, check + # that we are within range. + return allow_expired or (period.end is None or now <= period.end) + + +class TrustedRoot(_TrustedRoot): + """Complete set of trusted entities for a Sigstore client""" + + @classmethod + def from_file(cls, path: str) -> "TrustedRoot": + """Create a new trust root from file""" + tr: TrustedRoot = cls().from_json(Path(path).read_bytes()) + return tr + + @classmethod + def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot": + """Create a new trust root from a TUF repository. + + If `offline`, will use trust root in local TUF cache. Otherwise will + update the trust root from remote TUF repository. + """ + path = TrustUpdater(url, offline).get_trusted_root_path() + return cls.from_file(path) + + @classmethod + def production(cls, offline: bool = False) -> "TrustedRoot": + """Create new trust root from Sigstore production TUF repository. + + If `offline`, will use trust root in local TUF cache. Otherwise will + update the trust root from remote TUF repository. + """ + return cls.from_tuf(DEFAULT_TUF_URL, offline) + + @classmethod + def staging(cls, offline: bool = False) -> "TrustedRoot": + """Create new trust root from Sigstore staging TUF repository. + + If `offline`, will use trust root in local TUF cache. Otherwise will + update the trust root from remote TUF repository. + """ + return cls.from_tuf(STAGING_TUF_URL, offline) + + @staticmethod + def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]: + """Return public key contents given transparency log instances.""" + + for key in tlogs: + if not _is_timerange_valid(key.public_key.valid_for, allow_expired=False): + continue + key_bytes = key.public_key.raw_bytes + if key_bytes: + yield key_bytes + + @staticmethod + def _get_ca_keys( + cas: list[CertificateAuthority], *, allow_expired: bool + ) -> Iterable[bytes]: + """Return public key contents given certificate authorities.""" + + for ca in cas: + if not _is_timerange_valid(ca.valid_for, allow_expired=allow_expired): + continue + for cert in ca.cert_chain.certificates: + yield cert.raw_bytes + + def get_ctfe_keys(self) -> list[bytes]: + """Return the active CTFE public keys contents.""" + ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs)) + if not ctfes: + raise MetadataError("Active CTFE keys not found in trusted root") + return ctfes + + def get_rekor_keys(self) -> list[bytes]: + """Return the rekor public key content.""" + keys: list[bytes] = list(self._get_tlog_keys(self.tlogs)) + + if len(keys) != 1: + raise MetadataError("Did not find one active Rekor key in trusted root") + return keys + + def get_fulcio_certs(self) -> list[Certificate]: + """Return the Fulcio certificates.""" + + certs: list[Certificate] + + # Return expired certificates too: they are expired now but may have + # been active when the certificate was used to sign. + certs = [ + load_der_x509_certificate(c) + for c in self._get_ca_keys(self.certificate_authorities, allow_expired=True) + ] + + if not certs: + raise MetadataError("Fulcio certificates not found in trusted root") + return certs diff --git a/sigstore/_internal/tuf.py b/sigstore/_internal/tuf.py index eeafccad3..ccfc1ab53 100644 --- a/sigstore/_internal/tuf.py +++ b/sigstore/_internal/tuf.py @@ -19,25 +19,16 @@ from __future__ import annotations import logging -from datetime import datetime, timezone from functools import lru_cache from pathlib import Path -from typing import Iterable from urllib import parse import appdirs -from cryptography.x509 import Certificate, load_der_x509_certificate -from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( - CertificateAuthority, - TransparencyLogInstance, - TrustedRoot, -) from tuf.api import exceptions as TUFExceptions from tuf.ngclient import RequestsFetcher, Updater from sigstore._utils import read_embedded -from sigstore.errors import MetadataError, RootError, TUFError +from sigstore.errors import RootError, TUFError logger = logging.getLogger(__name__) @@ -73,28 +64,6 @@ def _get_dirs(url: str) -> tuple[Path, Path]: return (tuf_data_dir / repo_base), (tuf_cache_dir / repo_base) -def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: - """ - Given a `period`, checks that the the current time is not before `start`. If - `allow_expired` is `False`, also checks that the current time is not after - `end`. - """ - now = datetime.now(timezone.utc) - - # If there was no validity period specified, the key is always valid. - if not period: - return True - - # Active: if the current time is before the starting period, we are not yet - # valid. - if now < period.start: - return False - - # If we want Expired keys, the key is valid at this point. Otherwise, check - # that we are within range. - return allow_expired or (period.end is None or now <= period.end) - - class TrustUpdater: """Internal trust root (certificates and keys) downloader. @@ -106,12 +75,15 @@ class TrustUpdater: production and staging instances) in the application resources. """ - def __init__(self, url: str) -> None: + def __init__(self, url: str, offline: bool = False) -> None: """ Create a new `TrustUpdater`, pulling from the given `url`. The URL is expected to match one of `sigstore-python`'s known TUF roots, i.e. for the production or staging Sigstore TUF repos. + + If not `offline`, TrustUpdater will update the TUF metadata from + the remote repository. """ self._repo_url = url self._metadata_dir, self._targets_dir = _get_dirs(url) @@ -151,124 +123,40 @@ def __init__(self, url: str) -> None: logger.debug(f"TUF metadata: {self._metadata_dir}") logger.debug(f"TUF targets cache: {self._targets_dir}") - @classmethod - def production(cls) -> TrustUpdater: - """ - Returns a `TrustUpdater` for the Sigstore production instances. - """ - return cls(DEFAULT_TUF_URL) - - @classmethod - def staging(cls) -> TrustUpdater: - """ - Returns a `TrustUpdater` for the Sigstore staging instances. - """ - return cls(STAGING_TUF_URL) + self._updater: None | Updater = None + if not offline: + # Initialize and update the toplevel TUF metadata + self._updater = Updater( + metadata_dir=str(self._metadata_dir), + metadata_base_url=self._repo_url, + target_base_url=parse.urljoin(f"{self._repo_url}/", "targets/"), + target_dir=str(self._targets_dir), + fetcher=_get_fetcher(), + ) + try: + self._updater.refresh() + except Exception as e: + raise TUFError("Failed to refresh TUF metadata") from e @lru_cache() - def _updater(self) -> Updater: - """Initialize and update the toplevel TUF metadata""" - updater = Updater( - metadata_dir=str(self._metadata_dir), - metadata_base_url=self._repo_url, - target_base_url=parse.urljoin(f"{self._repo_url}/", "targets/"), - target_dir=str(self._targets_dir), - fetcher=_get_fetcher(), - ) - - # NOTE: we would like to avoid refresh if the toplevel metadata is valid. - # https://github.com/theupdateframework/python-tuf/issues/2225 - try: - updater.refresh() - except Exception as e: - raise TUFError("Failed to refresh TUF metadata") from e - - return updater + def get_trusted_root_path(self) -> str: + """Return local path to currently valid trusted root file""" + if not self._updater: + logger.debug("Using unverified trusted root from cache") + return str(self._targets_dir / "trusted_root.json") - @lru_cache() - def _get_trusted_root(self) -> TrustedRoot: - root_info = self._updater().get_targetinfo("trusted_root.json") + root_info = self._updater.get_targetinfo("trusted_root.json") if root_info is None: raise TUFError("Unsupported TUF configuration: no trusted root") - path = self._updater().find_cached_target(root_info) + path = self._updater.find_cached_target(root_info) if path is None: try: - path = self._updater().download_target(root_info) + path = self._updater.download_target(root_info) except ( TUFExceptions.DownloadError, TUFExceptions.RepositoryError, ) as e: raise TUFError("Failed to download trusted key bundle") from e - logger.debug("Found trusted root") - return TrustedRoot().from_json(Path(path).read_bytes()) - - def _get_tlog_keys(self, tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]: - """Return public key contents given transparency log instances.""" - - for key in tlogs: - if not _is_timerange_valid(key.public_key.valid_for, allow_expired=False): - continue - key_bytes = key.public_key.raw_bytes - if key_bytes: - yield key_bytes - - def _get_ca_keys( - self, cas: list[CertificateAuthority], *, allow_expired: bool - ) -> Iterable[bytes]: - """Return public key contents given certificate authorities.""" - - for ca in cas: - if not _is_timerange_valid(ca.valid_for, allow_expired=allow_expired): - continue - for cert in ca.cert_chain.certificates: - yield cert.raw_bytes - - def get_ctfe_keys(self) -> list[bytes]: - """Return the active CTFE public keys contents. - - May download files from the remote repository. - """ - ctfes: list[bytes] - - trusted_root = self._get_trusted_root() - ctfes = list(self._get_tlog_keys(trusted_root.ctlogs)) - - if not ctfes: - raise MetadataError("CTFE keys not found in TUF metadata") - return ctfes - - def get_rekor_keys(self) -> list[bytes]: - """Return the rekor public key content. - - May download files from the remote repository. - """ - keys: list[bytes] - - trusted_root = self._get_trusted_root() - keys = list(self._get_tlog_keys(trusted_root.tlogs)) - - if len(keys) != 1: - raise MetadataError("Did not find one active Rekor key in TUF metadata") - return keys - - def get_fulcio_certs(self) -> list[Certificate]: - """Return the Fulcio certificates. - - May download files from the remote repository. - """ - certs: list[Certificate] - - trusted_root = self._get_trusted_root() - # Return expired certificates too: they are expired now but may have - # been active when the certificate was used to sign. - certs = [ - load_der_x509_certificate(c) - for c in self._get_ca_keys( - trusted_root.certificate_authorities, allow_expired=True - ) - ] - - if not certs: - raise MetadataError("Fulcio certificates not found in TUF metadata") - return certs + logger.debug("Found and verified trusted root") + return path diff --git a/sigstore/sign.py b/sigstore/sign.py index 0ab9cec02..fdb63ee8d 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -82,7 +82,7 @@ ) from sigstore._internal.rekor.client import RekorClient from sigstore._internal.sct import verify_sct -from sigstore._internal.tuf import TrustUpdater +from sigstore._internal.trustroot import TrustedRoot from sigstore._utils import B64Str, PEMCert, sha256_streaming from sigstore.oidc import ExpiredIdentity, IdentityToken from sigstore.transparency import LogEntry @@ -294,8 +294,8 @@ def production(cls) -> SigningContext: """ Return a `SigningContext` instance configured against Sigstore's production-level services. """ - updater = TrustUpdater.production() - rekor = RekorClient.production(updater) + trust_root = TrustedRoot.production() + rekor = RekorClient.production(trust_root) return cls( fulcio=FulcioClient.production(), rekor=rekor, @@ -306,8 +306,8 @@ def staging(cls) -> SigningContext: """ Return a `SignerContext` instance configured against Sigstore's staging-level services. """ - updater = TrustUpdater.staging() - rekor = RekorClient.staging(updater) + trust_root = TrustedRoot.staging() + rekor = RekorClient.staging(trust_root) return cls( fulcio=FulcioClient.staging(), rekor=rekor, diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index e783542b9..999e40b27 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -47,7 +47,7 @@ ) from sigstore._internal.rekor.client import RekorClient from sigstore._internal.set import InvalidSETError, verify_set -from sigstore._internal.tuf import TrustUpdater +from sigstore._internal.trustroot import TrustedRoot from sigstore._utils import B64Str, HexStr from sigstore.verify.models import InvalidRekorEntry as InvalidRekorEntryError from sigstore.verify.models import RekorEntryMissing as RekorEntryMissingError @@ -126,10 +126,10 @@ def production(cls) -> Verifier: """ Return a `Verifier` instance configured against Sigstore's production-level services. """ - updater = TrustUpdater.production() + trust_root = TrustedRoot.production() return cls( - rekor=RekorClient.production(updater), - fulcio_certificate_chain=updater.get_fulcio_certs(), + rekor=RekorClient.production(trust_root), + fulcio_certificate_chain=trust_root.get_fulcio_certs(), ) @classmethod @@ -137,10 +137,10 @@ def staging(cls) -> Verifier: """ Return a `Verifier` instance configured against Sigstore's staging-level services. """ - updater = TrustUpdater.staging() + trust_root = TrustedRoot.staging() return cls( - rekor=RekorClient.staging(updater), - fulcio_certificate_chain=updater.get_fulcio_certs(), + rekor=RekorClient.staging(trust_root), + fulcio_certificate_chain=trust_root.get_fulcio_certs(), ) def verify( diff --git a/test/unit/conftest.py b/test/unit/conftest.py index b8ec60dba..fa71dd476 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -124,6 +124,10 @@ def asset(self, name: str): return (_TUF_ASSETS / name).read_bytes() def target(self, name: str): + path = self.target_path(name) + return path.read_bytes() if path else None + + def target_path(self, name: str) -> Path: # Since TUF contains both sha256 and sha512 prefixed targets, filter # out the sha512 ones. matches = filter( @@ -137,7 +141,7 @@ def target(self, name: str): raise Exception(f"Unable to match {name} in targets/") from e if next(matches, None) is None: - return path.read_bytes() + return path return None return TUFAsset() diff --git a/test/unit/internal/test_tuf.py b/test/unit/internal/test_trust_root.py similarity index 55% rename from test/unit/internal/test_tuf.py rename to test/unit/internal/test_trust_root.py index 36c63d4e3..cf243bef8 100644 --- a/test/unit/internal/test_tuf.py +++ b/test/unit/internal/test_trust_root.py @@ -16,73 +16,86 @@ import os from datetime import datetime, timedelta, timezone -import pretend import pytest from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509 import load_pem_x509_certificate from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange -from sigstore._internal.tuf import TrustUpdater, _is_timerange_valid +from sigstore._internal.trustroot import TrustedRoot, _is_timerange_valid from sigstore._utils import load_der_public_key, load_pem_public_key from sigstore.errors import RootError -def test_updater_staging_caches_and_requests(mock_staging_tuf, tuf_dirs): +def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs): # start with empty target cache, empty local metadata dir data_dir, cache_dir = tuf_dirs - # keep track of successful and failed requests TrustUpdater makes + # keep track of requests the TrustUpdater invoked by TrustedRoot makes reqs, fail_reqs = mock_staging_tuf - updater = TrustUpdater.staging() - # Expect root.json bootstrapped from _store - assert sorted(os.listdir(data_dir)) == ["root.json"] - # Expect no requests happened - assert reqs == {} - assert fail_reqs == {} - - updater.get_ctfe_keys() - # Expect local metadata to now contain all top-level metadata files + trust_root = TrustedRoot.staging() + # metadata was "downloaded" from staging expected = ["root.json", "snapshot.json", "targets.json", "timestamp.json"] assert sorted(os.listdir(data_dir)) == expected - # Expect requests of top-level metadata, and the ctfe targets + + # Expect requests of top-level metadata (and 404 for the next root version) + # Don't expect trusted_root.json request as it's cached already expected_requests = { "2.root.json": 1, "2.snapshot.json": 1, "2.targets.json": 1, "timestamp.json": 1, - # trusted_root.json should not be requested, as it is cached locally } expected_fail_reqs = {"3.root.json": 1} - assert reqs == expected_requests - # Expect 404 from the next root version assert fail_reqs == expected_fail_reqs - updater.get_rekor_keys() - # Expect no requests, as the `get_ctfe_keys` should have populated the bundled trust root - assert reqs == expected_requests - assert fail_reqs == expected_fail_reqs + trust_root.get_ctfe_keys() + trust_root.get_rekor_keys() - # New Updater instance, same cache dirs - updater = TrustUpdater.staging() - # Expect no requests happened + # no new requests assert reqs == expected_requests assert fail_reqs == expected_fail_reqs - updater.get_ctfe_keys() + # New trust root (and TrustUpdater instance), same cache dirs + trust_root = TrustedRoot.staging() + # Expect new timestamp and root requests expected_requests["timestamp.json"] += 1 expected_fail_reqs["3.root.json"] += 1 assert reqs == expected_requests assert fail_reqs == expected_fail_reqs - updater.get_rekor_keys() + trust_root.get_ctfe_keys() + trust_root.get_rekor_keys() # Expect no requests assert reqs == expected_requests assert fail_reqs == expected_fail_reqs +def test_trust_root_tuf_offline(mock_staging_tuf, tuf_dirs): + # start with empty target cache, empty local metadata dir + data_dir, cache_dir = tuf_dirs + + # keep track of requests the TrustUpdater invoked by TrustedRoot makes + reqs, fail_reqs = mock_staging_tuf + + trust_root = TrustedRoot.staging(offline=True) + + # Only the embedded root is in local TUF metadata, nothing is downloaded + expected = ["root.json"] + assert sorted(os.listdir(data_dir)) == expected + assert reqs == {} + assert fail_reqs == {} + + trust_root.get_ctfe_keys() + trust_root.get_rekor_keys() + + # Still no requests + assert reqs == {} + assert fail_reqs == {} + + def test_is_timerange_valid(): def range_from(offset_lower=0, offset_upper=0): base = datetime.now(timezone.utc) @@ -112,7 +125,7 @@ def range_from(offset_lower=0, offset_upper=0): ) # Valid: 1 ago, 1 ago -def test_bundled_get(monkeypatch, mock_staging_tuf, tuf_asset): +def test_trust_root_bundled_get(monkeypatch, mock_staging_tuf, tuf_asset): # We don't strictly need to re-encode these keys as they are already DER, # but by doing so we are also validating the keys structurally. def _der_keys(keys): @@ -131,19 +144,15 @@ def _pem_keys(keys): for k in keys ] - updater = TrustUpdater.staging() - - assert _der_keys(updater.get_ctfe_keys()) == _pem_keys( + ctfe_keys = _pem_keys( [ tuf_asset.target("ctfe.pub"), tuf_asset.target("ctfe_2022.pub"), tuf_asset.target("ctfe_2022_2.pub"), ] ) - assert _der_keys(updater.get_rekor_keys()) == _pem_keys( - [tuf_asset.target("rekor.pub")] - ) - assert updater.get_fulcio_certs() == [ + rekor_keys = _pem_keys([tuf_asset.target("rekor.pub")]) + fulcio_certs = [ load_pem_x509_certificate(c) for c in [ tuf_asset.target("fulcio.crt.pem"), @@ -151,25 +160,42 @@ def _pem_keys(keys): ] ] + # Assert that trust root from TUF contains the expected keys/certs + trust_root = TrustedRoot.staging() + assert _der_keys(trust_root.get_ctfe_keys()) == ctfe_keys + assert _der_keys(trust_root.get_rekor_keys()) == rekor_keys + assert trust_root.get_fulcio_certs() == fulcio_certs + + # Assert that trust root from offline TUF contains the expected keys/certs + trust_root = TrustedRoot.staging(offline=True) + assert _der_keys(trust_root.get_ctfe_keys()) == ctfe_keys + assert _der_keys(trust_root.get_rekor_keys()) == rekor_keys + assert trust_root.get_fulcio_certs() == fulcio_certs + + # Assert that trust root from file contains the expected keys/certs + path = tuf_asset.target_path("trusted_root.json") + trust_root = TrustedRoot.from_file(path) + assert _der_keys(trust_root.get_ctfe_keys()) == ctfe_keys + assert _der_keys(trust_root.get_rekor_keys()) == rekor_keys + assert trust_root.get_fulcio_certs() == fulcio_certs + -def test_updater_instance_error(): +def test_trust_root_tuf_instance_error(): with pytest.raises(RootError): - TrustUpdater("foo.bar") + TrustedRoot.from_tuf("foo.bar") -def test_updater_ctfe_keys_error(monkeypatch): - updater = TrustUpdater.staging() - trusted_root = pretend.stub(ctlogs=[]) - monkeypatch.setattr(updater, "_get_trusted_root", lambda: trusted_root) - with pytest.raises(Exception, match="CTFE keys not found in TUF metadata"): - updater.get_ctfe_keys() +def test_trust_root_tuf_ctfe_keys_error(monkeypatch): + trust_root = TrustedRoot.staging(offline=True) + monkeypatch.setattr(trust_root, "ctlogs", []) + with pytest.raises(Exception, match="Active CTFE keys not found in trusted root"): + trust_root.get_ctfe_keys() -def test_updater_fulcio_certs_error(tuf_asset, monkeypatch): - updater = TrustUpdater.staging() - trusted_root = pretend.stub(certificate_authorities=[]) - monkeypatch.setattr(updater, "_get_trusted_root", lambda: trusted_root) +def test_trust_root_fulcio_certs_error(tuf_asset, monkeypatch): + trust_root = TrustedRoot.staging(offline=True) + monkeypatch.setattr(trust_root, "certificate_authorities", []) with pytest.raises( - Exception, match="Fulcio certificates not found in TUF metadata" + Exception, match="Fulcio certificates not found in trusted root" ): - updater.get_fulcio_certs() + trust_root.get_fulcio_certs() diff --git a/test/unit/verify/test_models.py b/test/unit/verify/test_models.py index a22b5bd88..e5dfe11e3 100644 --- a/test/unit/verify/test_models.py +++ b/test/unit/verify/test_models.py @@ -16,7 +16,7 @@ import pytest from sigstore._internal.rekor.client import RekorClient -from sigstore._internal.tuf import TrustUpdater +from sigstore._internal.trustroot import TrustedRoot from sigstore.verify.models import ( InvalidMaterials, InvalidRekorEntry, @@ -45,8 +45,8 @@ def test_verification_materials_retrieves_rekor_entry(self, signing_materials): materials = signing_materials("a.txt") assert materials._rekor_entry is None - tuf = TrustUpdater.staging() - client = RekorClient.staging(tuf) + trust_root = TrustedRoot.staging() + client = RekorClient.staging(trust_root) entry = materials.rekor_entry(client) assert entry is not None