Skip to content

Commit

Permalink
feat: apply kstatus feature from ocm-controller (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
Skarlso authored Nov 14, 2023
1 parent 5b7bd85 commit 9b57691
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 150 deletions.
13 changes: 13 additions & 0 deletions api/v1alpha1/componentsubscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package v1alpha1

import (
"fmt"
"time"

"github.com/fluxcd/pkg/apis/meta"
Expand Down Expand Up @@ -104,6 +105,18 @@ type ComponentSubscriptionStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

func (in *ComponentSubscription) GetVID() map[string]string {
vid := fmt.Sprintf("%s:%s", in.Status.LastAttemptedVersion, in.Status.LastAppliedVersion)
metadata := make(map[string]string)
metadata[GroupVersion.Group+"/component_subscription"] = vid

return metadata
}

func (in *ComponentSubscription) SetObservedGeneration(v int64) {
in.Status.ObservedGeneration = v
}

// GetConditions returns the conditions of the ComponentVersion.
func (in *ComponentSubscription) GetConditions() []metav1.Condition {
return in.Status.Conditions
Expand Down
190 changes: 146 additions & 44 deletions controllers/componentsubscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,128 @@ import (
"time"

"github.com/Masterminds/semver/v3"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
rreconcile "github.com/fluxcd/pkg/runtime/reconcile"
"github.com/open-component-model/ocm-controller/pkg/event"
"github.com/open-component-model/ocm-controller/pkg/status"
"github.com/open-component-model/replication-controller/api/v1alpha1"
"github.com/open-component-model/replication-controller/pkg/ocm"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/open-component-model/replication-controller/api/v1alpha1"
"github.com/open-component-model/replication-controller/pkg/ocm"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// ComponentSubscriptionReconciler reconciles a ComponentSubscription object
type ComponentSubscriptionReconciler struct {
client.Client
Scheme *runtime.Scheme

OCMClient ocm.Contract
OCMClient ocm.Contract
EventRecorder record.EventRecorder
}

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentSubscriptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
const (
sourceKey = ".metadata.source.secretRef"
destinationKey = ".metadata.destination.secretRef"
)

if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1alpha1.ComponentSubscription{}, sourceKey, func(rawObj client.Object) []string {
obj, ok := rawObj.(*v1alpha1.ComponentSubscription)
if !ok {
return []string{}
}
if obj.Spec.Source.SecretRef == nil {
return []string{}
}

ns := obj.GetNamespace()
return []string{fmt.Sprintf("%s/%s", ns, obj.Spec.Source.SecretRef.Name)}
}); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}

if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1alpha1.ComponentSubscription{}, sourceKey, func(rawObj client.Object) []string {
obj, ok := rawObj.(*v1alpha1.ComponentSubscription)
if !ok {
return []string{}
}
if obj.Spec.Destination.SecretRef == nil {
return []string{}
}

ns := obj.GetNamespace()
return []string{fmt.Sprintf("%s/%s", ns, obj.Spec.Destination.SecretRef.Name)}
}); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}

return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ComponentSubscription{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(
&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(r.findObjects(sourceKey, destinationKey))).
Complete(r)
}

// findObjects finds component versions that have a key for the secret that triggered this watch event.
func (r *ComponentSubscriptionReconciler) findObjects(sourceKey string, destinationKey string) handler.MapFunc {
return func(obj client.Object) []reconcile.Request {
sourceList := &v1alpha1.ComponentSubscriptionList{}
if err := r.List(context.Background(), sourceList, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(sourceKey, client.ObjectKeyFromObject(obj).String()),
}); err != nil {
return []reconcile.Request{}
}

destinationList := &v1alpha1.ComponentSubscriptionList{}
if err := r.List(context.Background(), destinationList, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(destinationKey, client.ObjectKeyFromObject(obj).String()),
}); err != nil {
return []reconcile.Request{}
}

// deduplicate the two secret lists
requestMap := make(map[reconcile.Request]struct{})
for _, item := range sourceList.Items {
requestMap[reconcile.Request{
NamespacedName: types.NamespacedName{
Name: item.GetName(),
Namespace: item.GetNamespace(),
},
}] = struct{}{}
}

for _, item := range destinationList.Items {
requestMap[reconcile.Request{
NamespacedName: types.NamespacedName{
Name: item.GetName(),
Namespace: item.GetNamespace(),
},
}] = struct{}{}
}

requests := make([]reconcile.Request, len(requestMap))
for k := range requestMap {
requests = append(requests, k)
}

return requests
}
}

//+kubebuilder:rbac:groups=delivery.ocm.software,resources=componentsubscriptions,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -44,7 +144,6 @@ type ComponentSubscriptionReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *ComponentSubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
logger := log.FromContext(ctx)
obj := &v1alpha1.ComponentSubscription{}
if err = r.Get(ctx, req.NamespacedName, obj); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -54,12 +153,7 @@ func (r *ComponentSubscriptionReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

logger = logger.WithValues("subscription", klog.KObj(obj))
logger.V(4).Info("starting reconcile loop")

if obj.DeletionTimestamp != nil {
logger.Info("subscription is being deleted...")

return
}

Expand All @@ -69,47 +163,64 @@ func (r *ComponentSubscriptionReconciler) Reconcile(ctx context.Context, req ctr

// Always attempt to patch the object and status after each reconciliation.
defer func() {
if perr := patchHelper.Patch(ctx, obj); perr != nil {
err = errors.Join(err, perr)
// Patching has not been set up, or the controller errored earlier.
if patchHelper == nil {
return
}

if derr := status.UpdateStatus(ctx, patchHelper, obj, r.EventRecorder, obj.GetRequeueAfter()); derr != nil {
err = errors.Join(err, derr)
}
}()

// Starts the progression by setting ReconcilingCondition.
// This will be checked in defer.
// Should only be deleted on a success.
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "reconciliation in progress for resource: %s", obj.Name)

return r.reconcile(ctx, obj)
}

func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1alpha1.ComponentSubscription) (ctrl.Result, error) {
logger := log.FromContext(ctx)
func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1alpha1.ComponentSubscription) (_ ctrl.Result, err error) {
if obj.Generation != obj.Status.ObservedGeneration {
rreconcile.ProgressiveStatus(
false,
obj,
meta.ProgressingReason,
"processing object: new generation %d -> %d",
obj.Status.ObservedGeneration,
obj.Generation,
)
}

octx, err := r.OCMClient.CreateAuthenticatedOCMContext(ctx, obj)
if err != nil {
err := fmt.Errorf("failed to authenticate OCM context: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.AuthenticationFailedReason, err.Error())
status.MarkAsStalled(r.EventRecorder, obj, v1alpha1.AuthenticationFailedReason, err.Error())

return ctrl.Result{}, err
return ctrl.Result{}, nil
}

version, err := r.OCMClient.GetLatestSourceComponentVersion(ctx, octx, obj)
if err != nil {
err := fmt.Errorf("failed to get latest component version: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.PullingLatestVersionFailedReason, err.Error())
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.PullingLatestVersionFailedReason, err.Error())

// we don't want to fail but keep searching until it's there. But we do mark the subscription as failed.
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
logger.V(4).Info("got newest version from component", "version", version)

// Because of the predicate, this subscription will be reconciled again once there is an update to its status field.
if version == obj.Status.LastAppliedVersion {
logger.Info("latest version and last applied version are a match and not empty")
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
r.markAsDone(obj)

return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}

latestSourceComponentVersion, err := semver.NewVersion(version)
if err != nil {
err := fmt.Errorf("failed to parse source component version: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.SemverConversionFailedReason, err.Error())
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.SemverConversionFailedReason, err.Error())

return ctrl.Result{}, err
}
Expand All @@ -122,16 +233,13 @@ func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1
lastAppliedVersion, err := semver.NewVersion(lastAppliedOriginal)
if err != nil {
err := fmt.Errorf("failed to parse latest version: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.SemverConversionFailedReason, err.Error())
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.SemverConversionFailedReason, err.Error())

return ctrl.Result{}, err
}

logger.V(4).Info("latest applied version is", "version", lastAppliedVersion.Original())

if latestSourceComponentVersion.LessThan(lastAppliedVersion) || latestSourceComponentVersion.Equal(lastAppliedVersion) {
logger.Info("no new version found", "version", latestSourceComponentVersion.Original(), "latest", lastAppliedVersion.Original())
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
r.markAsDone(obj)

return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
Expand All @@ -142,48 +250,42 @@ func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1
sourceComponentVersion, err := r.OCMClient.GetComponentVersion(ctx, octx, obj, latestSourceComponentVersion.Original())
if err != nil {
err := fmt.Errorf("failed to get latest component version: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.GetComponentDescriptorFailedReason, err.Error())
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.GetComponentDescriptorFailedReason, err.Error())

return ctrl.Result{}, err
}

defer func() {
if err := sourceComponentVersion.Close(); err != nil {
logger.Error(err, "failed to close source component version, context might be leaking...")
if cerr := sourceComponentVersion.Close(); cerr != nil {
err = errors.Join(err, cerr)
}
}()

logger.V(4).Info("pulling", "component-name", sourceComponentVersion.GetName())

if obj.Spec.Destination != nil {
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "transferring component to target repository: %s", obj.Spec.Destination.URL)

if err := r.OCMClient.TransferComponent(ctx, octx, obj, sourceComponentVersion, latestSourceComponentVersion.Original()); err != nil {
err := fmt.Errorf("failed to transfer components: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.TransferFailedReason, err.Error())
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.TransferFailedReason, err.Error())

logger.Error(err, "transferring components failed")
return ctrl.Result{}, err
}

obj.Status.ReplicatedRepositoryURL = obj.Spec.Destination.URL
} else {
logger.Info("skipping transferring as no destination is provided for source component", "component-name", sourceComponentVersion.GetName())

obj.Status.ReplicatedRepositoryURL = obj.Spec.Source.URL
}

// Update the replicated version to the latest version
obj.Status.LastAppliedVersion = latestSourceComponentVersion.Original()

logger.Info("resource is ready")
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
r.markAsDone(obj)

// Always requeue to constantly check for new versions.
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentSubscriptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ComponentSubscription{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
func (r *ComponentSubscriptionReconciler) markAsDone(obj *v1alpha1.ComponentSubscription) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
event.New(r.EventRecorder, obj, eventv1.EventSeverityInfo, "Reconciliation success", nil)
}
12 changes: 9 additions & 3 deletions controllers/componentsubscription_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/open-component-model/ocm/pkg/contexts/ocm"
Expand Down Expand Up @@ -187,12 +188,17 @@ func TestComponentSubscriptionReconciler(t *testing.T) {
cv := tt.subscription()
client := env.FakeKubeClient(WithObjets(cv))
fakeOcm := &fakes.MockFetcher{}
recorder := &record.FakeRecorder{
Events: make(chan string, 32),
IncludeObject: true,
}
tt.setupMock(fakeOcm)

cvr := ComponentSubscriptionReconciler{
Scheme: env.scheme,
Client: client,
OCMClient: fakeOcm,
Scheme: env.scheme,
Client: client,
OCMClient: fakeOcm,
EventRecorder: recorder,
}

_, err := cvr.Reconcile(context.Background(), ctrl.Request{
Expand Down
Loading

0 comments on commit 9b57691

Please sign in to comment.