Skip to content

Commit

Permalink
Add metrics for kernel_tracing provider, fix mutex issue (#42795) (#4…
Browse files Browse the repository at this point in the history
…2894)

* add metrics for kernel_tracing provider, fix mutex issue

* fix metrics setup, add tests

* still tinkering with monitoring

* add changelog

(cherry picked from commit 5197c43)

Co-authored-by: Alex K. <8418476+fearful-symmetry@users.noreply.github.com>
  • Loading branch information
mergify[bot] and fearful-symmetry authored Feb 27, 2025
1 parent fcda890 commit 89ae1e0
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- hasher: Geneneral improvements and fixes. {pull}41863[41863]
- hasher: Add a cached hasher for upcoming backend. {pull}41952[41952]
- Split common tty definitions. {pull}42004[42004]
- Fix potential data loss in add_session_metadata. {pull}42795[42795]

*Auditbeat*

Expand Down
46 changes: 34 additions & 12 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (
logName = "processor." + processorName
procfsType = "procfs"
kernelTracingType = "kernel_tracing"

regNameProcessDB = "processor.add_session_metadata.processdb"
regNameKernelTracing = "processor.add_session_metadata.kernel_tracing"
)

// InitializeModule initializes this module.
Expand All @@ -53,25 +56,42 @@ type addSessionMetadata struct {
providerType string
}

func genRegistry(reg *monitoring.Registry, base string) *monitoring.Registry {
// if more than one instance of the DB is running, start to increment the metrics keys.
// This is kind of an edge case, but best to handle it so monitoring does not explode.
// This seems like awkward code, but NewRegistry() loves to panic, so we need to be careful.
id := 0
if reg.GetRegistry(base) != nil {
current := int(instanceID.Load())
// because we call genRegistry() multiple times, make sure the registry doesn't exist before we iterate the counter
if current > 0 && reg.GetRegistry(fmt.Sprintf("%s.%d", base, current)) == nil {
id = current
} else {
id = int(instanceID.Add(1))
}

}

regName := base
if id > 0 {
regName = fmt.Sprintf("%s.%d", base, id)
}

metricsReg := reg.NewRegistry(regName)
return metricsReg
}

func New(cfg *cfg.C) (beat.Processor, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

logger := logp.NewLogger(logName)

id := int(instanceID.Add(1))
regName := "processor.add_session_metadata.processdb"
// if more than one instance of the DB is running, start to increment the metrics keys.
if id > 1 {
regName = fmt.Sprintf("%s.%d", regName, id)
}
metricsReg := monitoring.Default.NewRegistry(regName)

procDBReg := genRegistry(monitoring.Default, regNameProcessDB)
ctx, cancel := context.WithCancel(context.Background())
reader := procfs.NewProcfsReader(*logger)
db, err := processdb.NewDB(ctx, metricsReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses)
db, err := processdb.NewDB(ctx, procDBReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create DB: %w", err)
Expand All @@ -82,7 +102,8 @@ func New(cfg *cfg.C) (beat.Processor, error) {

switch c.Backend {
case "auto":
p, err = kerneltracingprovider.NewProvider(ctx, logger)
procDBReg := genRegistry(monitoring.Default, regNameKernelTracing)
p, err = kerneltracingprovider.NewProvider(ctx, logger, procDBReg)
if err != nil {
// Most likely cause of error is not supporting ebpf or kprobes on system, try procfs
backfilledPIDs := db.ScrapeProcfs()
Expand All @@ -108,7 +129,8 @@ func New(cfg *cfg.C) (beat.Processor, error) {
}
pType = procfsType
case "kernel_tracing":
p, err = kerneltracingprovider.NewProvider(ctx, logger)
procDBReg := genRegistry(monitoring.Default, regNameKernelTracing)
p, err = kerneltracingprovider.NewProvider(ctx, logger, procDBReg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create kernel_tracing provider: %w", err)
Expand Down
22 changes: 22 additions & 0 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sessionmd

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -339,6 +340,27 @@ var (
logger = logp.NewLogger("add_session_metadata_test")
)

func TestMetricsSetup(t *testing.T) {
// init a metrics registry multiple times with the same name, ensure we don't panic, and the names are correct
reg := monitoring.NewRegistry()
firstName := "test.metrics"
secondName := "other.stuff"
genRegistry(reg, firstName)
require.NotNil(t, reg.Get(firstName))

genRegistry(reg, firstName)
require.NotNil(t, reg.Get(fmt.Sprintf("%s.1", firstName)))

genRegistry(reg, secondName)
require.NotNil(t, reg.Get(secondName))
require.Nil(t, reg.Get(fmt.Sprintf("%s.1", secondName)))

genRegistry(reg, secondName)
require.NotNil(t, reg.Get(secondName))
require.NotNil(t, reg.Get(fmt.Sprintf("%s.1", secondName)))
require.Nil(t, reg.Get(fmt.Sprintf("%s.2", secondName)))
}

func TestEnrich(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15)
defer cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

type prvdr struct {
Expand Down Expand Up @@ -82,14 +83,16 @@ func readPIDNsInode() (uint64, error) {
}

// NewProvider returns a new instance of kerneltracingprovider
func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, error) {
func NewProvider(ctx context.Context, logger *logp.Logger, reg *monitoring.Registry) (provider.Provider, error) {
attr := quark.DefaultQueueAttr()
attr.Flags = quark.QQ_ALL_BACKENDS | quark.QQ_ENTRY_LEADER | quark.QQ_NO_SNAPSHOT
qq, err := quark.OpenQueue(attr, 64)
if err != nil {
return nil, fmt.Errorf("open queue: %w", err)
}

procMetrics := NewStats(reg)

p := &prvdr{
ctx: ctx,
logger: logger,
Expand All @@ -102,7 +105,10 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e
backoffSkipped: 0,
}

go func(ctx context.Context, qq *quark.Queue, logger *logp.Logger, p *prvdr) {
go func(ctx context.Context, qq *quark.Queue, logger *logp.Logger, p *prvdr, stats *Stats) {

lastUpdate := time.Now()

defer qq.Close()
for ctx.Err() == nil {
p.qqMtx.Lock()
Expand All @@ -112,6 +118,19 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e
logger.Errorw("get events from quark, no more process enrichment from this processor will be done", "error", err)
break
}
if time.Since(lastUpdate) > time.Second*5 {
p.qqMtx.Lock()
metrics := qq.Stats()
p.qqMtx.Unlock()

stats.Aggregations.Set(metrics.Aggregations)
stats.Insertions.Set(metrics.Insertions)
stats.Lost.Set(metrics.Lost)
stats.NonAggregations.Set(metrics.NonAggregations)
stats.Removals.Set(metrics.Removals)
lastUpdate = time.Now()
}

if len(events) == 0 {
err = qq.Block()
if err != nil {
Expand All @@ -120,7 +139,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e
}
}
}
}(ctx, qq, logger, p)
}(ctx, qq, logger, p, procMetrics)

bootID, err = readBootID()
if err != nil {
Expand Down Expand Up @@ -150,11 +169,8 @@ const (
// does not exceed a reasonable threshold that would delay all other events processed by auditbeat. When in the backoff state, enrichment
// will proceed without waiting for the process data to exist in the cache, likely resulting in missing enrichment data.
func (p *prvdr) Sync(_ *beat.Event, pid uint32) error {
p.qqMtx.Lock()
defer p.qqMtx.Unlock()

// If pid is already in qq, return immediately
if _, found := p.qq.Lookup(int(pid)); found {
if _, found := p.lookupLocked(pid); found {
return nil
}

Expand All @@ -169,7 +185,7 @@ func (p *prvdr) Sync(_ *beat.Event, pid uint32) error {
nextWait := 5 * time.Millisecond
for {
waited := time.Since(start)
if _, found := p.qq.Lookup(int(pid)); found {
if _, found := p.lookupLocked(pid); found {
p.logger.Debugw("got process that was missing ", "waited", waited)
p.combinedWait = p.combinedWait + waited
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build linux && (amd64 || arm64) && cgo

package kerneltracingprovider

import (
"github.com/elastic/elastic-agent-libs/monitoring"
)

// / Stats tracks the quark internal stats, which are integrated into the beats monitoring runtime
type Stats struct {
Insertions *monitoring.Uint
Removals *monitoring.Uint
Aggregations *monitoring.Uint
NonAggregations *monitoring.Uint
Lost *monitoring.Uint
}

// / NewStats creates a new stats object
func NewStats(reg *monitoring.Registry) *Stats {
return &Stats{
Insertions: monitoring.NewUint(reg, "insertions"),
Removals: monitoring.NewUint(reg, "removals"),
Aggregations: monitoring.NewUint(reg, "aggregations"),
NonAggregations: monitoring.NewUint(reg, "nonaggregations"),
Lost: monitoring.NewUint(reg, "lost"),
}
}

0 comments on commit 89ae1e0

Please sign in to comment.