From 89ae1e04e04e342fb94dfd255209d68291ed00b0 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:04:11 -0800 Subject: [PATCH] Add metrics for kernel_tracing provider, fix mutex issue (#42795) (#42894) * add metrics for kernel_tracing provider, fix mutex issue * fix metrics setup, add tests * still tinkering with monitoring * add changelog (cherry picked from commit 5197c4320a262ac850fa8bdb9718c2ba5e54027d) Co-authored-by: Alex K. <8418476+fearful-symmetry@users.noreply.github.com> --- CHANGELOG.next.asciidoc | 1 + .../sessionmd/add_session_metadata.go | 46 ++++++++++++++----- .../sessionmd/add_session_metadata_test.go | 22 +++++++++ .../kerneltracingprovider_linux.go | 32 +++++++++---- .../provider/kerneltracingprovider/metrics.go | 31 +++++++++++++ 5 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4614f4212fae..a2a03a791a93 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -130,6 +130,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - hasher: Geneneral improvements and fixes. {pull}41863[41863] - hasher: Add a cached hasher for upcoming backend. {pull}41952[41952] - Split common tty definitions. {pull}42004[42004] +- Fix potential data loss in add_session_metadata. {pull}42795[42795] *Auditbeat* diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index ed6701e18064..6a5d115a5463 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -32,6 +32,9 @@ const ( logName = "processor." + processorName procfsType = "procfs" kernelTracingType = "kernel_tracing" + + regNameProcessDB = "processor.add_session_metadata.processdb" + regNameKernelTracing = "processor.add_session_metadata.kernel_tracing" ) // InitializeModule initializes this module. @@ -53,6 +56,31 @@ type addSessionMetadata struct { providerType string } +func genRegistry(reg *monitoring.Registry, base string) *monitoring.Registry { + // if more than one instance of the DB is running, start to increment the metrics keys. + // This is kind of an edge case, but best to handle it so monitoring does not explode. + // This seems like awkward code, but NewRegistry() loves to panic, so we need to be careful. + id := 0 + if reg.GetRegistry(base) != nil { + current := int(instanceID.Load()) + // because we call genRegistry() multiple times, make sure the registry doesn't exist before we iterate the counter + if current > 0 && reg.GetRegistry(fmt.Sprintf("%s.%d", base, current)) == nil { + id = current + } else { + id = int(instanceID.Add(1)) + } + + } + + regName := base + if id > 0 { + regName = fmt.Sprintf("%s.%d", base, id) + } + + metricsReg := reg.NewRegistry(regName) + return metricsReg +} + func New(cfg *cfg.C) (beat.Processor, error) { c := defaultConfig() if err := cfg.Unpack(&c); err != nil { @@ -60,18 +88,10 @@ func New(cfg *cfg.C) (beat.Processor, error) { } logger := logp.NewLogger(logName) - - id := int(instanceID.Add(1)) - regName := "processor.add_session_metadata.processdb" - // if more than one instance of the DB is running, start to increment the metrics keys. - if id > 1 { - regName = fmt.Sprintf("%s.%d", regName, id) - } - metricsReg := monitoring.Default.NewRegistry(regName) - + procDBReg := genRegistry(monitoring.Default, regNameProcessDB) ctx, cancel := context.WithCancel(context.Background()) reader := procfs.NewProcfsReader(*logger) - db, err := processdb.NewDB(ctx, metricsReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses) + db, err := processdb.NewDB(ctx, procDBReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses) if err != nil { cancel() return nil, fmt.Errorf("failed to create DB: %w", err) @@ -82,7 +102,8 @@ func New(cfg *cfg.C) (beat.Processor, error) { switch c.Backend { case "auto": - p, err = kerneltracingprovider.NewProvider(ctx, logger) + procDBReg := genRegistry(monitoring.Default, regNameKernelTracing) + p, err = kerneltracingprovider.NewProvider(ctx, logger, procDBReg) if err != nil { // Most likely cause of error is not supporting ebpf or kprobes on system, try procfs backfilledPIDs := db.ScrapeProcfs() @@ -108,7 +129,8 @@ func New(cfg *cfg.C) (beat.Processor, error) { } pType = procfsType case "kernel_tracing": - p, err = kerneltracingprovider.NewProvider(ctx, logger) + procDBReg := genRegistry(monitoring.Default, regNameKernelTracing) + p, err = kerneltracingprovider.NewProvider(ctx, logger, procDBReg) if err != nil { cancel() return nil, fmt.Errorf("failed to create kernel_tracing provider: %w", err) diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go index 422af4c935c2..d0fa6fad0665 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go @@ -8,6 +8,7 @@ package sessionmd import ( "context" + "fmt" "testing" "time" @@ -339,6 +340,27 @@ var ( logger = logp.NewLogger("add_session_metadata_test") ) +func TestMetricsSetup(t *testing.T) { + // init a metrics registry multiple times with the same name, ensure we don't panic, and the names are correct + reg := monitoring.NewRegistry() + firstName := "test.metrics" + secondName := "other.stuff" + genRegistry(reg, firstName) + require.NotNil(t, reg.Get(firstName)) + + genRegistry(reg, firstName) + require.NotNil(t, reg.Get(fmt.Sprintf("%s.1", firstName))) + + genRegistry(reg, secondName) + require.NotNil(t, reg.Get(secondName)) + require.Nil(t, reg.Get(fmt.Sprintf("%s.1", secondName))) + + genRegistry(reg, secondName) + require.NotNil(t, reg.Get(secondName)) + require.NotNil(t, reg.Get(fmt.Sprintf("%s.1", secondName))) + require.Nil(t, reg.Get(fmt.Sprintf("%s.2", secondName))) +} + func TestEnrich(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) defer cancel() diff --git a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go index e57c5d693557..c94e6bb1dec9 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider" "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" ) type prvdr struct { @@ -82,7 +83,7 @@ func readPIDNsInode() (uint64, error) { } // NewProvider returns a new instance of kerneltracingprovider -func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, error) { +func NewProvider(ctx context.Context, logger *logp.Logger, reg *monitoring.Registry) (provider.Provider, error) { attr := quark.DefaultQueueAttr() attr.Flags = quark.QQ_ALL_BACKENDS | quark.QQ_ENTRY_LEADER | quark.QQ_NO_SNAPSHOT qq, err := quark.OpenQueue(attr, 64) @@ -90,6 +91,8 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e return nil, fmt.Errorf("open queue: %w", err) } + procMetrics := NewStats(reg) + p := &prvdr{ ctx: ctx, logger: logger, @@ -102,7 +105,10 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e backoffSkipped: 0, } - go func(ctx context.Context, qq *quark.Queue, logger *logp.Logger, p *prvdr) { + go func(ctx context.Context, qq *quark.Queue, logger *logp.Logger, p *prvdr, stats *Stats) { + + lastUpdate := time.Now() + defer qq.Close() for ctx.Err() == nil { p.qqMtx.Lock() @@ -112,6 +118,19 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e logger.Errorw("get events from quark, no more process enrichment from this processor will be done", "error", err) break } + if time.Since(lastUpdate) > time.Second*5 { + p.qqMtx.Lock() + metrics := qq.Stats() + p.qqMtx.Unlock() + + stats.Aggregations.Set(metrics.Aggregations) + stats.Insertions.Set(metrics.Insertions) + stats.Lost.Set(metrics.Lost) + stats.NonAggregations.Set(metrics.NonAggregations) + stats.Removals.Set(metrics.Removals) + lastUpdate = time.Now() + } + if len(events) == 0 { err = qq.Block() if err != nil { @@ -120,7 +139,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e } } } - }(ctx, qq, logger, p) + }(ctx, qq, logger, p, procMetrics) bootID, err = readBootID() if err != nil { @@ -150,11 +169,8 @@ const ( // does not exceed a reasonable threshold that would delay all other events processed by auditbeat. When in the backoff state, enrichment // will proceed without waiting for the process data to exist in the cache, likely resulting in missing enrichment data. func (p *prvdr) Sync(_ *beat.Event, pid uint32) error { - p.qqMtx.Lock() - defer p.qqMtx.Unlock() - // If pid is already in qq, return immediately - if _, found := p.qq.Lookup(int(pid)); found { + if _, found := p.lookupLocked(pid); found { return nil } @@ -169,7 +185,7 @@ func (p *prvdr) Sync(_ *beat.Event, pid uint32) error { nextWait := 5 * time.Millisecond for { waited := time.Since(start) - if _, found := p.qq.Lookup(int(pid)); found { + if _, found := p.lookupLocked(pid); found { p.logger.Debugw("got process that was missing ", "waited", waited) p.combinedWait = p.combinedWait + waited return nil diff --git a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go new file mode 100644 index 000000000000..3f7378306c26 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux && (amd64 || arm64) && cgo + +package kerneltracingprovider + +import ( + "github.com/elastic/elastic-agent-libs/monitoring" +) + +// / Stats tracks the quark internal stats, which are integrated into the beats monitoring runtime +type Stats struct { + Insertions *monitoring.Uint + Removals *monitoring.Uint + Aggregations *monitoring.Uint + NonAggregations *monitoring.Uint + Lost *monitoring.Uint +} + +// / NewStats creates a new stats object +func NewStats(reg *monitoring.Registry) *Stats { + return &Stats{ + Insertions: monitoring.NewUint(reg, "insertions"), + Removals: monitoring.NewUint(reg, "removals"), + Aggregations: monitoring.NewUint(reg, "aggregations"), + NonAggregations: monitoring.NewUint(reg, "nonaggregations"), + Lost: monitoring.NewUint(reg, "lost"), + } +}