diff --git a/e2e/testdata/porch/rpkg-update/config.yaml b/e2e/testdata/porch/rpkg-update/config.yaml index 8ea23fc7c..06f86a33c 100644 --- a/e2e/testdata/porch/rpkg-update/config.yaml +++ b/e2e/testdata/porch/rpkg-update/config.yaml @@ -61,6 +61,7 @@ commands: stdout: | NAME PACKAGE WORKSPACENAME REVISION LATEST LIFECYCLE REPOSITORY git-3f036055f7ba68706372cbe0c4b14d553794f7c4 basens-edit update-1 false Draft git + git-804ab1a9d043e44255ef3fb77820d5ad7b1576a9 basens-edit update-3 main false Published git git-7fcdd499f0790ac3bd8f805e3e5e00825641eb60 basens-edit update-3 v1 true Published git git-7ab0219ace10c1081a8b40a6b97d5da58bdb62e0 basens-edit-clone update-2 false Draft git - args: @@ -82,6 +83,7 @@ commands: stdout: | PACKAGE REVISION UPSTREAM REPOSITORY UPSTREAM UPDATES git-3f036055f7ba68706372cbe0c4b14d553794f7c4 No update available + git-804ab1a9d043e44255ef3fb77820d5ad7b1576a9 No update available git-7fcdd499f0790ac3bd8f805e3e5e00825641eb60 No update available git-7ab0219ace10c1081a8b40a6b97d5da58bdb62e0 git v1 - args: @@ -110,6 +112,7 @@ commands: stdout: | PACKAGE REVISION UPSTREAM REPOSITORY UPSTREAM UPDATES git-3f036055f7ba68706372cbe0c4b14d553794f7c4 No update available + git-804ab1a9d043e44255ef3fb77820d5ad7b1576a9 No update available git-7fcdd499f0790ac3bd8f805e3e5e00825641eb60 No update available git-7ab0219ace10c1081a8b40a6b97d5da58bdb62e0 git No update available - args: diff --git a/porch/pkg/cache/draft.go b/porch/pkg/cache/draft.go index 80b313d5d..ddf8ca290 100644 --- a/porch/pkg/cache/draft.go +++ b/porch/pkg/cache/draft.go @@ -16,6 +16,7 @@ package cache import ( "context" + "fmt" "github.com/GoogleContainerTools/kpt/porch/pkg/repository" ) @@ -28,9 +29,20 @@ type cachedDraft struct { var _ repository.PackageDraft = &cachedDraft{} func (cd *cachedDraft) Close(ctx context.Context) (repository.PackageRevision, error) { - if closed, err := cd.PackageDraft.Close(ctx); err != nil { + closed, err := cd.PackageDraft.Close(ctx) + if err != nil { return nil, err - } else { - return cd.cache.update(ctx, closed) } + + err = cd.cache.reconcileCache(ctx, "close-draft") + if err != nil { + return nil, err + } + + cpr := cd.cache.getPackageRevision(closed.Key()) + if cpr == nil { + return nil, fmt.Errorf("closed draft not found") + } + + return cpr, nil } diff --git a/porch/pkg/cache/repository.go b/porch/pkg/cache/repository.go index 4772c1223..45b771388 100644 --- a/porch/pkg/cache/repository.go +++ b/porch/pkg/cache/repository.go @@ -68,6 +68,11 @@ type cachedRepository struct { reconcileMutex sync.Mutex cachedPackageRevisions map[repository.PackageRevisionKey]*cachedPackageRevision + // not ideal but this is another cache, used by the underlying storage to avoid + // reloading. Would be best to combine these somehow, but not doing that now. + // Eventual CRD-based redesign should make this entire repo cache obsolete + packageRevisionCache repository.PackageRevisionCache + // TODO: Currently we support repositories with homogenous content (only packages xor functions). Model this more optimally? cachedFunctions []repository.Function // Error encountered on repository refresh by the refresh goroutine. @@ -97,6 +102,10 @@ func newRepository(id string, repoSpec *configapi.Repository, repo repository.Re return r } +func (r *cachedRepository) nn() string { + return r.repoSpec.Namespace + "/" + r.repoSpec.Name +} + func (r *cachedRepository) Version(ctx context.Context) (string, error) { return r.repo.Version(ctx) } @@ -118,6 +127,14 @@ func (r *cachedRepository) ListFunctions(ctx context.Context) ([]repository.Func return functions, nil } +func (r *cachedRepository) getPackageRevision(key repository.PackageRevisionKey) *cachedPackageRevision { + r.mutex.RLock() + defer r.mutex.RUnlock() + + cpr, _ := r.cachedPackageRevisions[key] + return cpr +} + func (r *cachedRepository) getRefreshError() error { r.mutex.RLock() defer r.mutex.RUnlock() @@ -161,7 +178,7 @@ func (r *cachedRepository) blockUntilLoaded(ctx context.Context) error { for { select { case <-ctx.Done(): - return fmt.Errorf("repo %s: stopped waiting for load because context is done: %v", r.id, ctx.Err()) + return fmt.Errorf("repo %s: stopped waiting for load because context is done: %v", r.nn(), ctx.Err()) default: r.mutex.RLock() if r.cachedPackageRevisions != nil { @@ -209,6 +226,11 @@ func (r *cachedRepository) CreatePackageRevision(ctx context.Context, obj *v1alp return nil, err } + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "create"); err != nil { + klog.Warningf("error reconciling cache after creating %v in %s: %v", created, r.nn(), err) + } + return &cachedDraft{ PackageDraft: created, cache: r, @@ -223,71 +245,28 @@ func (r *cachedRepository) UpdatePackageRevision(ctx context.Context, old reposi return nil, err } + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "update"); err != nil { + klog.Warningf("error reconciling cache after updating %v in %s: %v", unwrapped.Key(), r.nn(), err) + } + return &cachedDraft{ PackageDraft: created, cache: r, }, nil } -func (r *cachedRepository) update(ctx context.Context, updated repository.PackageRevision) (*cachedPackageRevision, error) { - err := r.blockUntilLoaded(ctx) - if err != nil { - return nil, err - } - - // we need the reconcileMutex, get it first - r.reconcileMutex.Lock() - defer r.reconcileMutex.Unlock() - - // we will also need the cache lock - r.mutex.Lock() - defer r.mutex.Unlock() - - k := updated.Key() - // previous := r.cachedPackageRevisions[k] - - if v1alpha1.LifecycleIsPublished(updated.Lifecycle()) { - oldKey := repository.PackageRevisionKey{ - Repository: k.Repository, - Package: k.Package, - WorkspaceName: k.WorkspaceName, - } - if _, ok := r.cachedPackageRevisions[oldKey]; ok { - delete(r.cachedPackageRevisions, oldKey) - } - } - - cached := &cachedPackageRevision{PackageRevision: updated} - r.cachedPackageRevisions[k] = cached - - // Recompute latest package revisions. - // TODO: Just updated package? - identifyLatestRevisions(r.cachedPackageRevisions) - - return cached, nil -} - func (r *cachedRepository) DeletePackageRevision(ctx context.Context, old repository.PackageRevision) error { - // get the reconcile lock first, before touching the underlying repo - r.reconcileMutex.Lock() - defer r.reconcileMutex.Unlock() - // Unwrap unwrapped := old.(*cachedPackageRevision).PackageRevision if err := r.repo.DeletePackageRevision(ctx, unwrapped); err != nil { return err } - r.mutex.Lock() - if r.cachedPackageRevisions != nil { - k := old.Key() - delete(r.cachedPackageRevisions, k) - - // Recompute latest package revisions. - // TODO: Only for affected object / key? - identifyLatestRevisions(r.cachedPackageRevisions) + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "delete"); err != nil { + klog.Warningf("error reconciling cache after deleting %v in %s: %v", unwrapped.Key(), r.nn(), err) } - r.mutex.Unlock() return nil } @@ -324,7 +303,7 @@ func (r *cachedRepository) Close() error { // There isn't really any correct way to handle finalizers here. We are removing // the repository, so we have to just delete the PackageRevision regardless of any // finalizers. - klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name) + klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.nn(), nn.Namespace, nn.Name) pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true) if err != nil { // There isn't much use in returning an error here, so we just log it @@ -338,7 +317,7 @@ func (r *cachedRepository) Close() error { } sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } - klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) + klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.nn(), sent, len(r.cachedPackageRevisions)) return r.repo.Close() } @@ -348,7 +327,7 @@ func (r *cachedRepository) pollForever(ctx context.Context, repoSyncFrequency ti for { select { case <-ctx.Done(): - klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err()) + klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.nn(), ctx.Err()) return default: r.pollOnce(ctx) @@ -358,27 +337,21 @@ func (r *cachedRepository) pollForever(ctx context.Context, repoSyncFrequency ti } func (r *cachedRepository) pollOnce(ctx context.Context) { - start := time.Now() - klog.Infof("repo %s: poll started", r.id) - defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }() ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes()) defer span.End() - if err := r.reconcileCache(ctx); err != nil { - klog.Warningf("error polling repo packages %s: %v", r.id, err) + if err := r.reconcileCache(ctx, "poll"); err != nil { + klog.Warningf("error polling repo packages %s: %v", r.nn(), err) } if _, err := r.getFunctions(ctx, true); err != nil { - klog.Warningf("error polling repo functions %s: %v", r.id, err) + klog.Warningf("error polling repo functions %s: %v", r.nn(), err) } } // reconcileCache updates the cached map for this repository // it also triggers notifications for all package changes // caller must NOT hold any locks -func (r *cachedRepository) reconcileCache(ctx context.Context) error { - // TODO: Avoid simultaneous fetches? - // TODO: Push-down partial refresh? - +func (r *cachedRepository) reconcileCache(ctx context.Context, reason string) error { // if this is not a package repo, just set the cache to "loaded" and return if r.repoSpec.Spec.Content != configapi.RepositoryContentPackage { r.mutex.Lock() @@ -391,7 +364,9 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { } start := time.Now() - defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }() + defer func() { + klog.Infof("repo %s: reconcile for %s finished in %f secs", r.nn(), reason, time.Since(start).Seconds()) + }() curVer, err := r.Version(ctx) if err != nil { @@ -414,31 +389,38 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { } } - // Look up all existing PackageRevCRs so we an compare those to the - // actual Packagerevisions found in git/oci, and add/prune PackageRevCRs + // Look up all existing PackageRevCRs so we can compare those to the + // actual PackageRevisions found in git/oci, and add/prune PackageRevCRs // as necessary. existingPkgRevCRs, err := r.metadataStore.List(ctx, r.repoSpec) if err != nil { return err } + // Create a map so we can quickly check if a specific PackageRevisionMeta exists. - existingPkgRevCRsMap := make(map[string]meta.PackageRevisionMeta) + pkgRevCRsMap := make(map[string]meta.PackageRevisionMeta) for i := range existingPkgRevCRs { pr := existingPkgRevCRs[i] - existingPkgRevCRsMap[pr.Name] = pr + pkgRevCRsMap[pr.Name] = pr } - newPackageRevisions, err := r.repo.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{}) + ctxWithCache := repository.ContextWithPackageRevisionCache(ctx, r.packageRevisionCache) + newPackageRevisions, err := r.repo.ListPackageRevisions(ctxWithCache, repository.ListPackageRevisionFilter{}) if err != nil { return fmt.Errorf("error listing packages: %w", err) } // Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions. + // and also recreate packageRevisionCache + prc := make(repository.PackageRevisionCache, len(newPackageRevisions)) newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions)) for _, newPackage := range newPackageRevisions { + cid := newPackage.CachedIdentifier() + prc[cid.Key] = repository.PackageRevisionCacheEntry{Version: cid.Version, PackageRevision: newPackage} + kname := newPackage.KubeObjectName() if newPackageRevisionNames[kname] != nil { - klog.Warningf("repo %s: found duplicate packages with name %v", r.id, kname) + klog.Warningf("repo %s: found duplicate packages with name %v", r.nn(), kname) } pkgRev := &cachedPackageRevision{ @@ -457,13 +439,15 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { } r.mutex.RUnlock() + addMeta := 0 + delMeta := 0 + // We go through all PackageRev CRs that represents PackageRevisions // in the current repo and make sure they all have a corresponding // PackageRevision. The ones that doesn't is removed. for _, prm := range existingPkgRevCRs { if _, found := newPackageRevisionNames[prm.Name]; !found { - klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found", - r.id, prm.Namespace, prm.Name) + delMeta += 1 if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{ Name: prm.Name, Namespace: prm.Namespace, @@ -471,7 +455,7 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { if !apierrors.IsNotFound(err) { // This will be retried the next time the sync runs. klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w", - r.id, prm.Name, prm.Namespace, err) + r.nn(), prm.Name, prm.Namespace, err) } } } @@ -480,17 +464,21 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { // We go through all the PackageRevisions and make sure they have // a corresponding PackageRev CR. for pkgRevName, pkgRev := range newPackageRevisionNames { - if _, found := existingPkgRevCRsMap[pkgRevName]; !found { + if _, found := pkgRevCRsMap[pkgRevName]; !found { pkgRevMeta := meta.PackageRevisionMeta{ Name: pkgRevName, Namespace: r.repoSpec.Namespace, } - if _, err := r.metadataStore.Create(ctx, pkgRevMeta, r.repoSpec.Name, pkgRev.UID()); err != nil { + addMeta += 1 + if created, err := r.metadataStore.Create(ctx, pkgRevMeta, r.repoSpec.Name, pkgRev.UID()); err != nil { // TODO: We should try to find a way to make these errors available through // either the repository CR or the PackageRevision CR. This will be // retried on the next sync. klog.Warningf("unable to create PackageRev CR for %s/%s: %w", r.repoSpec.Namespace, pkgRevName, err) + } else { + // add to the cache for notifications later + pkgRevCRsMap[pkgRevName] = created } } } @@ -513,6 +501,7 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { // anyone responding to the notification will get the new values r.mutex.Lock() r.cachedPackageRevisions = newPackageRevisionMap + r.packageRevisionCache = prc r.lastVersion = curVer r.mutex.Unlock() @@ -521,8 +510,9 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { modSent := 0 for kname, newPackage := range newPackageRevisionNames { oldPackage := oldPackageRevisionNames[kname] - metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()] + metaPackage, found := pkgRevCRsMap[newPackage.KubeObjectName()] if !found { + // should never happen klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName()) } if oldPackage == nil { @@ -535,28 +525,16 @@ func (r *cachedRepository) reconcileCache(ctx context.Context) error { } delSent := 0 - // Send notifications for packages that was deleted in the SoT + // Send notifications for packages that were deleted in the SoT for kname, oldPackage := range oldPackageRevisionNames { if newPackageRevisionNames[kname] == nil { - nn := types.NamespacedName{ + metaPackage := meta.PackageRevisionMeta{ Name: oldPackage.KubeObjectName(), Namespace: oldPackage.KubeObjectNamespace(), } - klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT", - r.id, nn.Namespace, nn.Name) - metaPackage, err := r.metadataStore.Delete(ctx, nn, true) - if err != nil { - if !apierrors.IsNotFound(err) { - klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err) - } - metaPackage = meta.PackageRevisionMeta{ - Name: nn.Name, - Namespace: nn.Namespace, - } - } delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage) } } - klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) + klog.Infof("repo %s: addMeta %d, delMeta %d, addSent %d, modSent %d, delSent %d for %d in-cache and %d in-storage package revisions", r.nn(), addMeta, delMeta, addSent, modSent, delSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) return nil } diff --git a/porch/pkg/engine/engine.go b/porch/pkg/engine/engine.go index 1aeec8d44..62c7ae15e 100644 --- a/porch/pkg/engine/engine.go +++ b/porch/pkg/engine/engine.go @@ -308,7 +308,7 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * return nil, fmt.Errorf("error listing package revisions: %w", err) } - if err := ensureUniqueWorkspaceName(obj, revs); err != nil { + if err := ensureUniqueWorkspaceName(repositoryObj, obj, revs); err != nil { return nil, err } @@ -351,9 +351,20 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * } // The workspaceName must be unique, because it used to generate the package revision's metadata.name. -func ensureUniqueWorkspaceName(obj *api.PackageRevision, existingRevs []repository.PackageRevision) error { +func ensureUniqueWorkspaceName(repositoryObj *configapi.Repository, obj *api.PackageRevision, existingRevs []repository.PackageRevision) error { + // HACK + // It's ok for the "main" revision to have the same workspace name + // So ignore main revisions in this calculation + mainRev := "" + if repositoryObj.Spec.Git != nil { + mainRev = repositoryObj.Spec.Git.Branch + } + for _, r := range existingRevs { k := r.Key() + if mainRev != "" && k.Revision == mainRev { + continue + } if k.WorkspaceName == obj.Spec.WorkspaceName { return fmt.Errorf("package revision workspaceNames must be unique; package revision with name %s in repo %s with "+ "workspaceName %s already exists", obj.Spec.PackageName, obj.Spec.RepositoryName, obj.Spec.WorkspaceName) diff --git a/porch/pkg/engine/fake/packagerevision.go b/porch/pkg/engine/fake/packagerevision.go index d8a4c1ec9..5f49931d4 100644 --- a/porch/pkg/engine/fake/packagerevision.go +++ b/porch/pkg/engine/fake/packagerevision.go @@ -35,6 +35,10 @@ type PackageRevision struct { Kptfile kptfile.KptFile } +func (pr *PackageRevision) CachedIdentifier() repository.CachedIdentifier { + return repository.CachedIdentifier{Key: pr.Key().String(), Version: pr.Key().Revision} +} + func (pr *PackageRevision) KubeObjectName() string { return pr.Name } diff --git a/porch/pkg/git/git.go b/porch/pkg/git/git.go index bbf1f8450..50b5c650b 100644 --- a/porch/pkg/git/git.go +++ b/porch/pkg/git/git.go @@ -278,6 +278,14 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit mainBranch := r.branch.RefInLocal() // Looking for the registered branch + // if a cache is available, use it + cache := repository.PackageRevisionCacheFromContext(ctx) + draftCache := 0 + tagCache := 0 + mainCache := 0 + draftLoaded := 0 + tagLoaded := 0 + mainLoaded := 0 for { ref, err := refs.Next() if err == io.EOF { @@ -290,9 +298,25 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit continue case isProposedBranchNameInLocal(ref.Name()), isDraftBranchNameInLocal(ref.Name()): - draft, err := r.loadDraft(ctx, ref) - if err != nil { - return nil, fmt.Errorf("failed to load package draft %q: %w", name.String(), err) + var draft *gitPackageRevision + if entry, ok := cache[ref.Name().String()]; ok { + if entry.Version == ref.Hash().String() { + dd, good := entry.PackageRevision.(*gitPackageRevision) + if !good { + klog.Warningf("Found current cached branch %s version %s, but it is not a gitPackageRevision", ref.Name(), entry.Version) + } else { + draft = dd + draftCache += 1 + } + } + } + + if draft == nil { + draft, err = r.loadDraft(ctx, ref) + if err != nil { + return nil, fmt.Errorf("failed to load package draft %q: %w", name.String(), err) + } + draftLoaded += 1 } if draft != nil { drafts = append(drafts, draft) @@ -300,24 +324,61 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit klog.Warningf("no package draft found for ref %v", ref) } case isTagInLocalRepo(ref.Name()): - tagged, err := r.loadTaggedPackages(ctx, ref) - if err != nil { - // this tag is not associated with any package (e.g. could be a release tag) - continue + var tagged *gitPackageRevision + if entry, ok := cache[ref.Name().String()]; ok { + if entry.Version == ref.Hash().String() { + dd, good := entry.PackageRevision.(*gitPackageRevision) + if !good { + klog.Warningf("Found current cached branch %s version %s, but it is not a gitPackageRevision", ref.Name(), entry.Version) + } else { + tagged = dd + tagCache += 1 + } + } } - for _, p := range tagged { - if filter.Matches(p) { - result = append(result, p) + if tagged == nil { + tagged, err = r.loadTaggedPackage(ctx, ref) + if err != nil { + // this tag is not associated with any package (e.g. could be a release tag) + continue } + tagLoaded += 1 + } + if tagged != nil && filter.Matches(tagged) { + result = append(result, tagged) } } } if main != nil { + // Look for any package whose cached identifier starts with main.Name() + // There will be one for each pacakge found in main, but they all will have the same + // hash. If that matches main.Hash() there is no change in main and so we can just + // copy all the packages rather than rediscovering. + var mainpkgs []*gitPackageRevision + for k, v := range cache { + if strings.Index(k, main.Name().String()) == 0 { + if v.Version != main.Hash().String() { + continue + } + gpr, ok := v.PackageRevision.(*gitPackageRevision) + if !ok { + klog.Warningf("Found current cached main package %s version %s, but it is not a gitPackageRevision", k, v.Version) + } else { + mainpkgs = append(mainpkgs, gpr) + mainCache += 1 + } + } + } + // TODO: ignore packages that are unchanged in main branch, compared to a tagged version? - mainpkgs, err := r.discoverFinalizedPackages(ctx, main) - if err != nil { - return nil, err + if len(mainpkgs) == 0 { + mp, err := r.discoverFinalizedPackages(ctx, main) + if err != nil { + return nil, err + } + mainpkgs = mp + mainLoaded = len(mainpkgs) } for _, p := range mainpkgs { if filter.Matches(p) { @@ -332,6 +393,8 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit } } + klog.Infof("repo %s/%s: %d draftCache, %d draftLoaded, %d tagCache, %d tagLoaded, %d mainCache, %d mainLoaded", r.namespace, r.name, + draftCache, draftLoaded, tagCache, tagLoaded, mainCache, mainLoaded) return result, nil } @@ -753,8 +816,8 @@ func parseDraftName(draft *plumbing.Reference) (name string, workspaceName v1alp return name, workspaceName, nil } -func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Reference) ([]*gitPackageRevision, error) { - ctx, span := tracer.Start(ctx, "gitRepository::loadTaggedPackages", trace.WithAttributes()) +func (r *gitRepository) loadTaggedPackage(ctx context.Context, tag *plumbing.Reference) (*gitPackageRevision, error) { + ctx, span := tracer.Start(ctx, "gitRepository::loadTaggedPackage", trace.WithAttributes()) defer span.End() name, ok := getTagNameInLocalRepo(tag.Name()) @@ -803,9 +866,7 @@ func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Re return nil, err } - return []*gitPackageRevision{ - packageRevision, - }, nil + return packageRevision, nil } @@ -1072,8 +1133,6 @@ func (r *gitRepository) pushAndCleanup(ctx context.Context, ph *pushRefSpecBuild return err } - klog.Infof("pushing refs: %v", specs) - if err := r.doGitWithAuth(ctx, func(auth transport.AuthMethod) error { return r.repo.Push(&git.PushOptions{ RemoteName: OriginName, @@ -1118,7 +1177,12 @@ func (r *gitRepository) loadTasks(ctx context.Context, startCommit *object.Commi var tasks []v1alpha1.Task + done := false visitCommit := func(commit *object.Commit) error { + if done { + return nil + } + gitAnnotations, err := ExtractGitAnnotations(commit) if err != nil { return err @@ -1143,6 +1207,7 @@ func (r *gitRepository) loadTasks(ctx context.Context, startCommit *object.Commi if gitAnnotation.Task != nil && (gitAnnotation.Task.Type == v1alpha1.TaskTypeClone || gitAnnotation.Task.Type == v1alpha1.TaskTypeInit) { // we have reached the beginning of this package revision and don't need to // continue further + done = true break } } @@ -1692,9 +1757,6 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov return nil, err } - if opt.FilterPrefix == "" { - klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash) - } return t, nil } diff --git a/porch/pkg/git/package.go b/porch/pkg/git/package.go index cec03a02c..9a0772bf1 100644 --- a/porch/pkg/git/package.go +++ b/porch/pkg/git/package.go @@ -78,6 +78,18 @@ func (p *gitPackageRevision) UID() types.UID { return p.uid() } +func (p *gitPackageRevision) CachedIdentifier() repository.CachedIdentifier { + if p.ref != nil { + k := p.ref.Name().String() + if p.revision == string(p.repo.branch) { + k += ":" + p.path + } + return repository.CachedIdentifier{Key: k, Version: p.ref.Hash().String()} + } + + return repository.CachedIdentifier{} +} + func (p *gitPackageRevision) ResourceVersion() string { return p.commit.String() } diff --git a/porch/pkg/oci/oci.go b/porch/pkg/oci/oci.go index e58a5e685..fac7ccf5c 100644 --- a/porch/pkg/oci/oci.go +++ b/porch/pkg/oci/oci.go @@ -402,6 +402,10 @@ type ociPackageRevision struct { lifecycle v1alpha1.PackageRevisionLifecycle } +func (p *ociPackageRevision) CachedIdentifier() repository.CachedIdentifier { + return repository.CachedIdentifier{Key: p.packageName + ":" + string(p.workspaceName), Version: p.resourceVersion} +} + var _ repository.PackageRevision = &ociPackageRevision{} func (p *ociPackageRevision) GetResources(ctx context.Context) (*v1alpha1.PackageRevisionResources, error) { diff --git a/porch/pkg/repository/repository.go b/porch/pkg/repository/repository.go index 9b8a89a2b..858ef9585 100644 --- a/porch/pkg/repository/repository.go +++ b/porch/pkg/repository/repository.go @@ -48,6 +48,37 @@ func (n PackageKey) String() string { return fmt.Sprintf("Repository: %q, Package: %q", n.Repository, n.Package) } +// CachedIdentier is a used by a cache and underlying storage +// implementation to avoid unnecessary reloads +type CachedIdentifier struct { + // Key uniquely identifies the resource in the underlying storage + Key string + + // Version uniquely identifies the version of the resource in the underlying storage + Version string +} + +type PackageRevisionCacheEntry struct { + Version string + PackageRevision PackageRevision +} + +type PackageRevisionCache map[string]PackageRevisionCacheEntry + +type packageCacheKey struct{} + +func ContextWithPackageRevisionCache(ctx context.Context, cache PackageRevisionCache) context.Context { + return context.WithValue(ctx, packageCacheKey{}, cache) +} + +func PackageRevisionCacheFromContext(ctx context.Context) PackageRevisionCache { + cache, ok := ctx.Value(packageCacheKey{}).(PackageRevisionCache) + if !ok { + cache = make(PackageRevisionCache) + } + return cache +} + // PackageRevision is an abstract package version. // We have a single object for both Revision and Resources, because conceptually they are one object. // The best way we've found (so far) to represent them in k8s is as two resources, but they map to the same object. @@ -67,6 +98,9 @@ type PackageRevision interface { // Key returns the "primary key" of the package. Key() PackageRevisionKey + // CachedIdentier returns a unique identifer for this package revision and version + CachedIdentifier() CachedIdentifier + // Lifecycle returns the current lifecycle state of the package. Lifecycle() v1alpha1.PackageRevisionLifecycle