Skip to content

Commit

Permalink
Collect data for field managers during Manifest reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomasz-Smelcerz-SAP committed Feb 19, 2025
1 parent cd403d8 commit fa2aa41
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 4 deletions.
185 changes: 185 additions & 0 deletions internal/manifest/skrresources/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package skrresources

import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"regexp"
"slices"
"strconv"
"strings"
"time"

"github.com/jellydator/ttlcache/v3"
apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kyma-project/lifecycle-manager/api/shared"
"github.com/kyma-project/lifecycle-manager/internal"
"github.com/kyma-project/lifecycle-manager/internal/manifest/manifestclient"
)

const (
knownManagersDefault = string(manifestclient.DefaultFieldOwner) + ";" +
shared.OperatorName + ";" +
"k3s" // Applied in k3s environments.
knownManagersEnvVar = "KLM_EXPERIMENTAL_KNOWN_MANAGERS"
knownManagersRegexp = `^[a-zA-Z][a-zA-Z0-9.:_/-]{1,127}$`

frequencyCacheTTLDefault = 60 * 5 // 5 minutes
frequencyCacheTTLEnvVar = "KLM_EXPERIMENTAL_FREQUENCY_CACHE_TTL"
frequencyCacheTTLRegexp = `^[1-9][0-9]{1,3}$`

managedFieldsAnalysisLabelEnvVar = "KLM_EXPERIMENTAL_MANAGED_FIELDS_ANALYSIS_LABEL"
)

var (
allowedManagers = getAllowedManagers() //nolint:gochecknoglobals // list of managers is a global configuration
emitCache = newEmitCache() //nolint:gochecknoglobals // singleton cache is used to prevent emitting the same log multiple times in a short period
)

type LogCollectorEntry struct {
ObjectName string `json:"objectName"`
ObjectNamespace string `json:"objectNamespace"`
ObjectGVK string `json:"objectGvk"`
ManagedFields []apimetav1.ManagedFieldsEntry `json:"managedFields"`
}

// Implements ManagedFieldsCollector interface, emits the colloected data to the log stream.
type LogCollector struct {
key string
owner client.FieldOwner
entries []LogCollectorEntry
}

func NewLogCollector(key string, owner client.FieldOwner) *LogCollector {
return &LogCollector{
key: key,
owner: owner,
entries: []LogCollectorEntry{},
}
}

func (c *LogCollector) Collect(ctx context.Context, remoteObj client.Object) {
managedFields := remoteObj.GetManagedFields()
for _, mf := range managedFields {
if isUnknownManager(mf.Manager) {
newEntry := LogCollectorEntry{
ObjectName: remoteObj.GetName(),
ObjectNamespace: remoteObj.GetNamespace(),
ObjectGVK: remoteObj.GetObjectKind().GroupVersionKind().String(),
ManagedFields: slices.Clone(remoteObj.GetManagedFields()),
}
c.entries = append(c.entries, newEntry)
return
}
}
}

func (c *LogCollector) Emit(ctx context.Context) error {
if len(c.entries) > 0 {
if emitCache.Has(c.key) {
logger := logf.FromContext(ctx, "owner", c.owner)
logger.V(internal.TraceLogLevel).Info("Unknown managers detection skipped (frequency)")
return nil
}
emitCache.Set(c.key, true, ttlcache.DefaultTTL)

jsonSer, err := json.MarshalIndent(c.entries, "", " ")
if err != nil {
return fmt.Errorf("failed to serialize managed field data: %w", err)
}
logData, err := compressAndBase64(jsonSer)
if err != nil {
return err
}

logger := logf.FromContext(ctx, "owner", c.owner)
logger.V(internal.TraceLogLevel).Info("Unknown managers detected", "base64gzip", logData)
}
return nil
}

// compressAndBase64 compresses the input byte slice using gzip and encodes it to base64 so that it can be logged as a string.
func compressAndBase64(in []byte) (string, error) {
var buf bytes.Buffer
archive := gzip.NewWriter(&buf)

_, err := archive.Write(in)
if err != nil {
return "", fmt.Errorf("failed to write to gzip archive: %w", err)
}

if err := archive.Close(); err != nil {
return "", fmt.Errorf("failed to close gzip archive: %w", err)
}

return base64.StdEncoding.EncodeToString(buf.Bytes()), nil
}

func isUnknownManager(manager string) bool {
return !slices.Contains(allowedManagers, manager)
}

// allowedManagers returns either a list configured in the KLM_RECONCILECONFIG_KNOWN_MANAGERS environment variable or the default list.
// The values must be separated by semicolons and are case-sensitive!
func getAllowedManagers() []string {
configured := os.Getenv(knownManagersEnvVar)
if configured == "" {
return splitBySemicolons(knownManagersDefault)
} else {
rxp := regexp.MustCompile(knownManagersRegexp)
configuredValues := splitBySemicolons(configured)
res := []string{}
for _, name := range configuredValues {
if rxp.MatchString(name) {
res = append(res, name)
}
}
return res
}
}

func getCacheTTL() int {
var res int = frequencyCacheTTLDefault

if configured := os.Getenv(frequencyCacheTTLEnvVar); configured != "" {
rxp := regexp.MustCompile(frequencyCacheTTLRegexp)
if rxp.MatchString(configured) {
if parsed, err := strconv.Atoi(configured); err == nil {
res = parsed
}
}
}

return res
}

func newEmitCache() *ttlcache.Cache[string, bool] {
cache := ttlcache.New[string, bool](ttlcache.WithTTL[string, bool](time.Duration(getCacheTTL()) * time.Second))
go cache.Start()
return cache
}

func splitBySemicolons(value string) []string {
return strings.Split(value, ";")
}

// Implements ManagedFieldsCollector interface, does nothing.
type nopCollector struct{}

func (c nopCollector) Collect(ctx context.Context, obj client.Object) {
}

func (c nopCollector) Emit(ctx context.Context) error {
return nil
}

func getManagedFieldsAnalysisLabel() string {
return os.Getenv(managedFieldsAnalysisLabelEnvVar)
}
83 changes: 83 additions & 0 deletions internal/manifest/skrresources/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package skrresources //nolint:testpackage // testing package internals

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetAllowedManagers(t *testing.T) {
tests := []struct {
name string
envValue string
want []string
}{
{
name: "default managers",
envValue: "",
want: []string{"declarative.kyma-project.io/applier", "lifecycle-manager", "k3s"},
},
{
name: "single manager in env",
envValue: "manager1",
want: []string{"manager1"},
},
{
name: "multiple managers in env",
envValue: "manager1;manager2;some-manager:3",
want: []string{"manager1", "manager2", "some-manager:3"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envValue != "" {
t.Setenv(knownManagersEnvVar, tt.envValue)
}
assert.Equal(t, tt.want, getAllowedManagers())
})
}
}

func TestGetCacheTTL(t *testing.T) {
tests := []struct {
name string
envValue string
want int
}{
{
name: "default TTL",
envValue: "",
want: 300,
},
{
name: "custom TTL",
envValue: "123",
want: 123,
},
{
name: "invalid value is ignored, default TTL is returned",
envValue: "abc",
want: 300,
},
{
name: "zero is invalid, default TTL is returned",
envValue: "0",
want: 300,
},
{
name: "Negative value is ignored, default TTL is returned",
envValue: "-123",
want: 300,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envValue != "" {
t.Setenv(frequencyCacheTTLEnvVar, tt.envValue)
}
assert.Equal(t, tt.want, getCacheTTL())
})
}
}
27 changes: 25 additions & 2 deletions internal/manifest/skrresources/ssa.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,40 @@ type SSA interface {
Run(ctx context.Context, resourceInfo []*resource.Info) error
}

type ManagedFieldsCollector interface {
// Collect collects managed fields data from the single object
Collect(ctx context.Context, obj client.Object)
// Emit emits collected data to some backing store
Emit(ctx context.Context) error
}

type ConcurrentDefaultSSA struct {
clnt client.Client
owner client.FieldOwner
versioner machineryruntime.GroupVersioner
converter machineryruntime.ObjectConvertor
collector ManagedFieldsCollector
}

func ConcurrentSSA(clnt client.Client, owner client.FieldOwner) *ConcurrentDefaultSSA {
func ConcurrentSSA(clnt client.Client, owner client.FieldOwner, managedFieldsCollector ManagedFieldsCollector) *ConcurrentDefaultSSA {
return &ConcurrentDefaultSSA{
clnt: clnt, owner: owner,
clnt: clnt,
owner: owner,
versioner: schema.GroupVersions(clnt.Scheme().PrioritizedVersionsAllGroups()),
converter: clnt.Scheme(),
collector: managedFieldsCollector,
}
}

//nolint:ireturn // interface return is required here
func (c *ConcurrentDefaultSSA) managedFieldsCollector() ManagedFieldsCollector {
if c.collector != nil {
return c.collector
}

return nopCollector{}
}

func (c *ConcurrentDefaultSSA) Run(ctx context.Context, resources []*resource.Info) error {
ssaStart := time.Now()
logger := logf.FromContext(ctx, "owner", c.owner)
Expand Down Expand Up @@ -70,6 +89,9 @@ func (c *ConcurrentDefaultSSA) Run(ctx context.Context, resources []*resource.In
return errors.Join(errs...)
}
logger.V(internal.DebugLogLevel).Info("ServerSideApply finished", "time", ssaFinish)
if err := c.managedFieldsCollector().Emit(ctx); err != nil {
logger.V(internal.DebugLogLevel).Error(err, "error emitting data of unknown field managers")
}
return nil
}

Expand Down Expand Up @@ -123,6 +145,7 @@ func (c *ConcurrentDefaultSSA) serverSideApplyResourceInfo(
)
}

c.managedFieldsCollector().Collect(ctx, obj)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/manifest/skrresources/ssa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestConcurrentSSA(t *testing.T) {
t.Run(
testCase.name, func(t *testing.T) {
t.Parallel()
ssa := skrresources.ConcurrentSSA(testCase.ssa.clnt, testCase.ssa.owner)
ssa := skrresources.ConcurrentSSA(testCase.ssa.clnt, testCase.ssa.owner, nil)
if err := ssa.Run(context.Background(), testCase.apply); err != nil {
require.ErrorIs(t, err, testCase.err)
}
Expand Down
19 changes: 18 additions & 1 deletion internal/manifest/skrresources/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ func SyncResources(ctx context.Context, skrClient client.Client, manifest *v1bet
) error {
manifestStatus := manifest.GetStatus()

if err := ConcurrentSSA(skrClient, manifestclient.DefaultFieldOwner).Run(ctx, target); err != nil {
var managedFieldsCollector ManagedFieldsCollector
if managedFieldsAnalysisEnabledFor(manifest) {
managedFieldsCollector = NewLogCollector(string(manifest.GetUID()), manifestclient.DefaultFieldOwner)
}

if err := ConcurrentSSA(skrClient, manifestclient.DefaultFieldOwner, managedFieldsCollector).Run(ctx, target); err != nil {
manifest.SetStatus(manifestStatus.WithState(shared.StateError).WithErr(err))
return err
}
Expand Down Expand Up @@ -59,3 +64,15 @@ func HasDiff(oldResources []shared.Resource, newResources []shared.Resource) boo
}
return false
}

// managedFieldsAnalysisEnabledFor checks if managed fields detection is enabled for the given manifest.
// The detection is enabled by default, but can be controlled by setting a specific label on the manifest CR.
func managedFieldsAnalysisEnabledFor(obj *v1beta2.Manifest) bool {
detectionLabelName := getManagedFieldsAnalysisLabel()
if detectionLabelName == "" {
return true
}

_, found := obj.GetLabels()[detectionLabelName]
return found
}

0 comments on commit fa2aa41

Please sign in to comment.