diff --git a/changelog/fragments/1704824761-fleet-config-change-logging.yaml b/changelog/fragments/1704824761-fleet-config-change-logging.yaml new file mode 100644 index 00000000000..94a47aa7b67 --- /dev/null +++ b/changelog/fragments/1704824761-fleet-config-change-logging.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: fleet-config-change-logging + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: Coordinator + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index bfd14a91381..0a5450ddada 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "reflect" "strings" "time" @@ -298,6 +299,28 @@ type managerChans struct { upgradeMarkerUpdate <-chan upgrade.UpdateMarker } +// diffCheck is a container used by checkAndLogUpdate() +type diffCheck struct { + inNew bool + inLast bool + updated bool +} + +// UpdateStats reports the diff of a component update. +// This is primarily used as a log message, and exported in case it's needed elsewhere. +type UpdateStats struct { + Components UpdateComponentChange `json:"components"` + Outputs UpdateComponentChange `json:"outputs"` +} + +// UpdateComponentChange reports stats for changes to a particular part of a config. +type UpdateComponentChange struct { + Added []string `json:"added,omitempty"` + Removed []string `json:"removed,omitempty"` + Updated []string `json:"updated,omitempty"` + Count int `json:"count,omitempty"` +} + // New creates a new coordinator. func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator { var fleetState cproto.State @@ -1216,10 +1239,149 @@ func (c *Coordinator) generateComponentModel() (err error) { // If we made it this far, update our internal derived values and // return with no error c.derivedConfig = cfg + + lastComponentModel := c.componentModel c.componentModel = comps + + c.checkAndLogUpdate(lastComponentModel) + return nil } +// compares the last component model with an updated model, +// logging any differences. +func (c *Coordinator) checkAndLogUpdate(lastComponentModel []component.Component) { + if lastComponentModel == nil { + c.logger.Debugf("Received initial component update; total of %d components", len(c.componentModel)) + return + } + + type compCheck struct { + inCurrent bool + inLast bool + + diffUnits map[string]diffCheck + } + + lastCompMap := convertComponentListToMap(lastComponentModel) + currentCompMap := convertComponentListToMap(c.componentModel) + + compDiffMap := map[string]compCheck{} + outDiffMap := map[string]diffCheck{} + // kinda-callbacks for dealing with output logic + foundInLast := func(outName string) { + if outDiff, ok := outDiffMap[outName]; ok { + outDiff.inLast = true + outDiffMap[outName] = outDiff + } else { + outDiffMap[outName] = diffCheck{inLast: true} + } + } + + foundInUpdated := func(outName string) { + if outDiff, ok := outDiffMap[outName]; ok { + outDiff.inNew = true + outDiffMap[outName] = outDiff + } else { + outDiffMap[outName] = diffCheck{inNew: true} + } + } + + // diff the maps + + // find added & updated components + for id, comp := range currentCompMap { + // check output + foundInUpdated(comp.OutputType) + // compare with last state + diff := compCheck{inCurrent: true} + if lastComp, ok := lastCompMap[id]; ok { + diff.inLast = true + // if the unit is in both the past and previous, check for updated units + diff.diffUnits = diffUnitList(lastComp.Units, comp.Units) + foundInLast(lastComp.OutputType) + // a bit of optimization: after we're done, we'll need to iterate over lastCompMap to fetch removed units, + // so delete items we don't need to iterate over + delete(lastCompMap, id) + } + compDiffMap[id] = diff + } + + // find removed components + // if something is still in this map, that means it's only in this map + for id, comp := range lastCompMap { + compDiffMap[id] = compCheck{inLast: true} + foundInLast(comp.OutputType) + } + + addedList := []string{} + removedList := []string{} + + formattedUpdated := []string{} + + // reduced to list of added/removed outputs + addedOutputs := []string{} + removedOutputs := []string{} + + // take our diff map and format everything for output + for id, diff := range compDiffMap { + if diff.inLast && !diff.inCurrent { + removedList = append(removedList, id) + } + if !diff.inLast && diff.inCurrent { + addedList = append(addedList, id) + } + // format a user-readable list of diffs + if diff.inLast && diff.inCurrent { + units := []string{} + for unitId, state := range diff.diffUnits { + action := "" + if state.inLast && !state.inNew { + action = "removed" + } + if state.inNew && !state.inLast { + action = "added" + } + if state.updated { + action = "updated" + } + if action != "" { + units = append(units, fmt.Sprintf("(%s: %s)", unitId, action)) + } + } + if len(units) > 0 { + formatted := fmt.Sprintf("%s: %v", id, units) + formattedUpdated = append(formattedUpdated, formatted) + } + } + } + + // format outputs + for output, comp := range outDiffMap { + if comp.inLast && !comp.inNew { + removedOutputs = append(removedOutputs, output) + } + if !comp.inLast && comp.inNew { + addedOutputs = append(addedOutputs, output) + } + } + + logStruct := UpdateStats{ + Components: UpdateComponentChange{ + Added: addedList, + Removed: removedList, + Count: len(c.componentModel), + Updated: formattedUpdated, + }, + Outputs: UpdateComponentChange{ + Added: addedOutputs, + Removed: removedOutputs, + }, + } + + c.logger.Infow("component model updated", "changes", logStruct) +} + // Filter any inputs and outputs in the generated component model // based on whether they're excluded by the capabilities config func (c *Coordinator) filterByCapabilities(comps []component.Component) []component.Component { @@ -1243,6 +1405,48 @@ func (c *Coordinator) filterByCapabilities(comps []component.Component) []compon return result } +// helpers for checkAndLogUpdate + +func convertUnitListToMap(unitList []component.Unit) map[string]component.Unit { + unitMap := map[string]component.Unit{} + for _, c := range unitList { + unitMap[c.ID] = c + } + return unitMap +} + +func convertComponentListToMap(compList []component.Component) map[string]component.Component { + compMap := map[string]component.Component{} + for _, c := range compList { + compMap[c.ID] = c + } + return compMap +} + +func diffUnitList(old, new []component.Unit) map[string]diffCheck { + oldMap := convertUnitListToMap(old) + newMap := convertUnitListToMap(new) + + diffMap := map[string]diffCheck{} + // find new and updated units + for id, newUnits := range newMap { + diff := diffCheck{inNew: true} + if oldUnit, ok := oldMap[id]; ok { + diff.inLast = true + if newUnits.Config != nil && oldUnit.Config != nil && newUnits.Config.GetSource() != nil && oldUnit.Config.GetSource() != nil { + diff.updated = !reflect.DeepEqual(newUnits.Config.GetSource().AsMap(), oldUnit.Config.GetSource().AsMap()) + } + delete(oldMap, id) + } + diffMap[id] = diff + } + // find removed units + for id := range oldMap { + diffMap[id] = diffCheck{inLast: true} + } + return diffMap +} + // collectManagerErrors listens on the shutdown channels for the // runtime, config, and vars managers as well as the upgrade marker // watcher and waits for up to the specified timeout for them to diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 49770bc1e97..3cfeddb32af 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -15,7 +15,7 @@ import ( "testing" "time" - "github.com/elastic/elastic-agent-client/v7/pkg/client" + "google.golang.org/protobuf/types/known/structpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,6 +24,8 @@ import ( "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec" @@ -97,6 +99,231 @@ func waitForState( } } +func TestComponentUpdateDiff(t *testing.T) { + + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + + cases := []struct { + name string + old []component.Component + new []component.Component + logtest func(t *testing.T, logs UpdateStats) + }{ + { + name: "test-basic-removed", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + }, + { + ID: "component-two", + OutputType: "kafka", + }, + }, + new: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + + require.Equal(t, []string{"component-two"}, logs.Components.Removed) + require.Equal(t, []string{"kafka"}, logs.Outputs.Removed) + }, + }, + { + name: "test-added-and-removed", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + }, + { + ID: "component-two", + OutputType: "kafka", + }, + }, + new: []component.Component{ + { + ID: "component-three", + OutputType: "elasticsearch", + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + require.Equal(t, 2, len(logs.Components.Removed)) + require.Equal(t, []string{"component-three"}, logs.Components.Added) + require.Equal(t, []string{"kafka"}, logs.Outputs.Removed) + }, + }, + { + name: "test-updated-component", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + {ID: "unit-one"}, + {ID: "unit-two"}, + {ID: "unit-x"}, + }, + }, + }, + new: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + {ID: "unit-one"}, + {ID: "unit-two"}, + {ID: "unit-three"}, + }, + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + require.Contains(t, logs.Components.Updated[0], "unit-three: added") + require.Contains(t, logs.Components.Updated[0], "unit-x: removed") + }, + }, + { + name: "just-change-output", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + }, + }, + new: []component.Component{ + { + ID: "component-one", + OutputType: "logstash", + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + require.Equal(t, []string{"elasticsearch"}, logs.Outputs.Removed) + require.Equal(t, []string{"logstash"}, logs.Outputs.Added) + }, + }, + { + name: "config-update", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + { + ID: "unit-one", + Config: &proto.UnitExpectedConfig{Source: mustNewStruct(t, map[string]interface{}{"example": "value"})}, + }, + }, + }, + }, + new: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + { + ID: "unit-one", + Config: &proto.UnitExpectedConfig{Source: mustNewStruct(t, map[string]interface{}{"example": "two"})}, + }, + }, + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + require.NotEmpty(t, logs.Components.Updated) + }, + }, + { + name: "config-no-changes", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + { + ID: "unit-one", + Config: &proto.UnitExpectedConfig{Source: mustNewStruct(t, map[string]interface{}{"example": "value"})}, + }, + }, + }, + }, + new: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + { + ID: "unit-one", + Config: &proto.UnitExpectedConfig{Source: mustNewStruct(t, map[string]interface{}{"example": "value"})}, + }, + }, + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + require.Len(t, logs.Components.Updated, 0) + }, + }, + { + name: "config-source-nil", + old: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + { + ID: "unit-one", + Config: &proto.UnitExpectedConfig{Id: "test"}, + }, + }, + }, + }, + new: []component.Component{ + { + ID: "component-one", + OutputType: "elasticsearch", + Units: []component.Unit{ + { + ID: "unit-one", + Config: &proto.UnitExpectedConfig{Id: "test"}, + }, + }, + }, + }, + logtest: func(t *testing.T, logs UpdateStats) { + require.Len(t, logs.Components.Updated, 0) + }, + }, + } + + for _, testcase := range cases { + + t.Run(testcase.name, func(t *testing.T) { + testCoord := Coordinator{ + logger: logp.L(), + componentModel: testcase.new, + } + testCoord.checkAndLogUpdate(testcase.old) + + obsLogs := logp.ObserverLogs().TakeAll() + last := obsLogs[len(obsLogs)-1] + + // extract the structured data from the log message + testcase.logtest(t, last.Context[0].Interface.(UpdateStats)) + }) + + } + +} + +func mustNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct { + str, err := structpb.NewStruct(v) + require.NoError(t, err) + return str +} + func TestCoordinator_State_Starting(t *testing.T) { coordCh := make(chan error) ctx, cancel := context.WithCancel(context.Background())