Skip to content

Commit

Permalink
add per-input metrics to libbeat pipeline client
Browse files Browse the repository at this point in the history
Aggregates per-input metrics in libbeat/publisher/pipeline.client. New metrics:
`events_pipeline_total`, `events_pipeline_filtered_total`, and
`events_pipeline_published_total`.

v2.Context includes a metric registry for context-specific metrics, created
under `beat.Info.Monitoring.Namespace` and named after the context ID. If the
context lacks an ID, a 'discard' metric registry, associated with no namespace,
is used, preventing metric publication.

v2.Input passes the v2.Context metric registry when creating the pipeline
client.

Introduces a new `inputmon` API, enabling inputs to publish their metrics via
the HTTP monitoring endpoint, replacing the now deprecated
`inputmon.NewInputRegistry`. Inputs must still register metrics using
`inputmon.RegisterMetrics` for them to be published, and call
`inputmon.UnregisterMetrics` to release resources on close.
The deprecated `inputmon.NewInputRegistry` remains functional.

Integration tests added for `filestream`, `httpjson`, and `cel` to verify new
metrics.
  • Loading branch information
AndersonQ committed Mar 7, 2025
1 parent ca4ad40 commit 40f57e8
Show file tree
Hide file tree
Showing 23 changed files with 1,042 additions and 148 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add support for marked redaction to x-pack/filebeat/input/internal/private {pull}41212[41212]
- Add support for collecting Okta role and factor data for users with filebeat entityanalytics input. {pull}41044[41044]
- Add CEL input program evaluation coverage collection support. {pull}41884[41884]
- Add a metric registry to the v2.Context. This should be the registry used by
inputs to publish metrics. This registry is then passed to the pipeline client
by the input/v2. When receiving a non-nil metrics registry the pipeline client
will aggregate metrics by input on this registry. It'll collect
`events_pipeline_total`, `events_pipeline_filtered_total`,
`events_pipeline_published_total`.{pull}42618[42618] {issue}42761[42761]

==== Deprecated

Expand Down
10 changes: 9 additions & 1 deletion filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-concert/unison"
)

Expand Down Expand Up @@ -128,7 +129,14 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer func() { _ = grp.Stop() }()
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"}
beatInfo := beat.Info{}
beatInfo.Monitoring.Namespace = monitoring.GetNamespace("dataset")
inputCtx := v2.Context{
ID: "fake-ID",
Agent: beatInfo,
Logger: logp.L(),
Cancelation: ctx,
}
_ = inp.Run(inputCtx, e.pipeline)
}(&e.wg, &e.grp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func startHarvester(
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
EventListener: newInputACKHandler(hg.ackCH),
InputMetricsRegistry: ctx.MetricRegistry(),
EventListener: newInputACKHandler(hg.ackCH),
})
if err != nil {
hg.readers.remove(srcID)
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (inp *managedInput) Test(ctx input.TestContext) error {
return inp.prospector.Test()
}

// Run
// Run runs the input
func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
Expand All @@ -64,7 +64,7 @@ func (inp *managedInput) Run(
defer cancel()
ctx.Cancelation = cancelCtx

metrics := NewMetrics(inp.metricsID)
metrics := NewMetrics(ctx, inp.metricsID)
defer metrics.Close()

hg := &defaultHarvesterGroup{
Expand All @@ -88,7 +88,7 @@ func (inp *managedInput) Run(
sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier)

// Mark it as running for now.
// Any errors encountered by harverter will change state to Degraded
// Any errors encountered by harvester will change state to Degraded
ctx.UpdateStatus(status.Running, "")

inp.prospector.Run(ctx, sourceStore, hg)
Expand Down
19 changes: 16 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package input_logfile
import (
"github.com/rcrowley/go-metrics"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
Expand Down Expand Up @@ -54,17 +55,29 @@ func (m *Metrics) Close() {
m.unregister()
}

func NewMetrics(id string) *Metrics {
func NewMetrics(ctx v2.Context, id string) *Metrics {
// The log input creates the `filebeat.harvester` registry as a package
// variable, so it should always exist before this function runs.
// However at least on testing scenarios this does not hold true, so
// However, at least on testing scenarios this does not hold true, so
// if needed, we create the registry ourselves.
harvesterMetrics := monitoring.Default.GetRegistry("filebeat.harvester")
if harvesterMetrics == nil {
harvesterMetrics = monitoring.Default.NewRegistry("filebeat.harvester")
}

reg, unreg := inputmon.NewInputRegistry("filestream", id, nil)
var reg *monitoring.Registry
var unreg = func() {}
if id != "" {
reg = ctx.EnhanceMetricRegistry(id, "filestream")
err := inputmon.RegisterMetrics(id, reg)
if err != nil {
ctx.Logger.Warn("failed to register filestream metrics: ", err)
}
unreg = func() { inputmon.UnregisterMetrics(id) }
} else {
reg = monitoring.NewRegistry()
}

m := Metrics{
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
Expand Down
24 changes: 16 additions & 8 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,26 @@ func (r *runner) Start() {
go func() {
defer r.wg.Done()
log.Infof("Input '%s' starting", name)
ctx := v2.Context{
ID: r.id,
IDWithoutName: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
StatusReporter: r.statusReporter,
}
err := r.input.Run(
v2.Context{
ID: r.id,
IDWithoutName: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
StatusReporter: r.statusReporter,
},
ctx,
r.connector,
)
if err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("Input '%s' failed with: %+v", name, err)
} else {
log.Infof("Input '%s' stopped (goroutine)", name)
}

// Input finished running, unregister the metrics
ctx.UnregisterMetrics()
}()
}

Expand All @@ -159,6 +163,10 @@ func (r *runner) Stop() {
r.statusReporter = nil
}

// configID extracts or generates an ID for a configuration.
// If the "id" is present in config and is non-empty, it is returned.
// If the "id" is absent or empty, the function calculates a hash of the
// entire configuration and returns it as a hexadecimal string as the ID.
func configID(config *conf.C) (string, error) {
tmp := struct {
ID string `config:"id"`
Expand Down
9 changes: 7 additions & 2 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ func (inp *managedInput) Run(
defer cancel()
ctx.Cancelation = cancelCtx

// This context isn't directly used, so unregister any metric it might have
// registered.
ctx.UnregisterMetrics()
var grp unison.MultiErrGroup
for _, source := range inp.sources {
source := source
grp.Go(func() (err error) {
// refine per worker context
inpCtx := ctx
inpCtx := ctx.Clone()

// Preserve IDWithoutName, in case the context was constructed who knows how
inpCtx.IDWithoutName = ctx.ID
inpCtx.ID = ctx.ID + "::" + source.Name()
Expand Down Expand Up @@ -147,7 +151,8 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
EventListener: newInputACKHandler(ctx.Logger),
InputMetricsRegistry: ctx.MetricRegistry(),
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.DefaultGuarantees,
InputMetricsRegistry: ctx.MetricRegistry(),
PublishMode: beat.DefaultGuarantees,
})
if err != nil {
return err
Expand Down
86 changes: 85 additions & 1 deletion filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package v2

import (
"context"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/go-concert/unison"
)
Expand Down Expand Up @@ -79,6 +81,12 @@ type Context struct {
// The input ID.
ID string

// GeneratedID is true when the config does not include an ID and therefore
// the runner uses a hash from the config as ID. It's used to know if the
// v2.Context generated for the input should have a non-discard metric
// registry.
GeneratedID bool

// The input ID without name. Some inputs append sourcename, we need the id to be untouched
// https://github.com/elastic/beats/blob/43d80af2aea60b0c45711475d114e118d90c4581/filebeat/input/v2/input-cursor/input.go#L118
IDWithoutName string
Expand All @@ -93,15 +101,91 @@ type Context struct {
// that maps to the config. Note: Under standalone execution of Filebeat this is
// expected to be nil.
StatusReporter status.StatusReporter

// monitoringRegistry is the registry collecting metrics for this input.
//
// This registry resides in beat.Info.Monitoring.Namespace, which is a
// unique namespace for beats running as OTel receivers or the global
// 'dataset' for compatibility reasons.
//
// Inputs without an ID, GeneratedID is true, have a 'discard' registry
// which is associated to no namespace resulting in the metrics not being
// published.
//
// Inputs must call EnhanceMetricRegistry to add the necessary variables for
// the metrics to be valid to be published by the HTTP monitoring endpoint.
// Also, inputs need to register its metrics for publishing by the
// monitoring endpoint by calling inputmon.RegisterMetrics.
monitoringRegistry *monitoring.Registry
monitoringRegistryCancel func()
}

func (c Context) UpdateStatus(status status.Status, msg string) {
func (c *Context) UpdateStatus(status status.Status, msg string) {
if c.StatusReporter != nil {
c.Logger.Debugf("updating status, status: '%s', message: '%s'", status.String(), msg)
c.StatusReporter.UpdateStatus(status, msg)
}
}

// MetricsID is the context ID with any '.' replaced by '_'.
func (c *Context) MetricsID() string {
return strings.ReplaceAll(c.ID, ".", "_")
}

// MetricRegistry returns the metric registry associated with this context.
// This should be the metric registry used by inputs to publish their metrics.
// If this context ID was generated, GeneratedID is true, a 'discard'
// registry is returned. It's a registry associated to no namespace and
// therefore its metrics cannot be published.
// If this context ID comes from the config, GeneratedID false, then a new
// metric registry named ID will be added to the beat.Info.Monitoring.Namespace
// or the existing registry will be returned.
func (c *Context) MetricRegistry() *monitoring.Registry {
if c.MetricsID() == "" {
return monitoring.NewRegistry()
}

if c.monitoringRegistry == nil {
reg := c.Agent.Monitoring.Registry().GetRegistry(c.MetricsID())
if reg == nil {
reg = c.Agent.Monitoring.Registry().NewRegistry(c.MetricsID())
}

c.monitoringRegistry = reg
c.monitoringRegistryCancel = func() {
c.Agent.Monitoring.Registry().Remove(c.MetricsID())
}
}

return c.monitoringRegistry
}

// EnhanceMetricRegistry adds `id` and `input` to the metric registry and
// returns the registry.
func (c *Context) EnhanceMetricRegistry(id, inputType string) *monitoring.Registry {
monitoring.NewString(c.MetricRegistry(), "input").Set(inputType)
monitoring.NewString(c.MetricRegistry(), "id").Set(id)

return c.MetricRegistry()
}

func (c *Context) UnregisterMetrics() {
if c.monitoringRegistryCancel != nil {
c.monitoringRegistryCancel()
}
}

// Clone is equivalent to newCtx := ctx, but it clears the monitoring
// registry, and it's unregister function.
func (c *Context) Clone() Context {
cc := *c

cc.monitoringRegistry = nil
cc.monitoringRegistryCancel = nil

return cc
}

// TestContext provides the Input Test function with common environmental
// information and services.
type TestContext struct {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (in winlogInput) Run(
api, _ := source.(eventlog.EventLog)
log := ctx.Logger.With("eventlog", source.Name(), "channel", api.Channel())
return eventlog.Run(
ctx,
&ctx,
ctxtool.FromCanceller(ctx.Cancelation),
api,
initCheckpoint(log, cursor),
Expand Down
Loading

0 comments on commit 40f57e8

Please sign in to comment.