Skip to content

Commit

Permalink
[chore] - Introduce a helper to setup registry (#42943)
Browse files Browse the repository at this point in the history
* chore: introduce a helper

* use "libbeat" registry

* chore: move registry to b.Info
  • Loading branch information
VihasMakwana authored Mar 4, 2025
1 parent 02c934b commit 61aadfe
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 49 deletions.
3 changes: 3 additions & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Info struct {
Monitoring struct {
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance

StateRegistry *monitoring.Registry
InfoRegistry *monitoring.Registry
}
LogConsumer consumer.Logs // otel log consumer
UseDefaultProcessors bool // Whether to use the default processors
Expand Down
108 changes: 59 additions & 49 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, u

b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())

b.SetupRegistry()

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
Expand Down Expand Up @@ -465,12 +467,17 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, u

uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry()

reg := b.Info.Monitoring.StateRegistry.GetRegistry("libbeat")
if reg == nil {
reg = b.Info.Monitoring.StateRegistry.NewRegistry("libbeat")
}

tel := uniq_reg.GetRegistry("state")
if tel == nil {
tel = uniq_reg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: uniq_reg,
Metrics: reg,
Telemetry: tel,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
Expand Down Expand Up @@ -572,7 +579,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}

// Report central management state
mgmt := monitoring.GetNamespace("state").GetRegistry().NewRegistry("management")
mgmt := b.Info.Monitoring.StateRegistry.NewRegistry("management")
monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled())

debugf("Initializing output plugins")
Expand All @@ -590,7 +597,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
var publisher *pipeline.Pipeline
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Telemetry: b.Info.Monitoring.StateRegistry,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
Expand Down Expand Up @@ -655,7 +662,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
svc.BeforeRun()
defer svc.Cleanup()

b.registerMetrics()
b.RegisterMetrics()

// Start the API Server before the Seccomp lock down, we do this so we can create the unix socket
// set the appropriate permission on the unix domain file without having to whitelist anything
Expand Down Expand Up @@ -763,75 +770,54 @@ func (b *Beat) reexec() error {
return b.doReexec()
}

// registerMetrics registers metrics with the internal monitoring API. This data
// RegisterMetrics registers metrics with the internal monitoring API. This data
// is then exposed through the HTTP monitoring endpoint (e.g. /info and /state)
// and/or pushed to Elasticsearch through the x-pack monitoring feature.
func (b *Beat) registerMetrics() {
func (b *Beat) RegisterMetrics() {
// info
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
monitoring.NewString(infoRegistry, "version").Set(b.Info.Version)
monitoring.NewString(infoRegistry, "beat").Set(b.Info.Beat)
monitoring.NewString(infoRegistry, "name").Set(b.Info.Name)
monitoring.NewString(infoRegistry, "uuid").Set(b.Info.ID.String())
monitoring.NewString(infoRegistry, "ephemeral_id").Set(b.Info.EphemeralID.String())
monitoring.NewString(infoRegistry, "binary_arch").Set(runtime.GOARCH)
monitoring.NewString(infoRegistry, "build_commit").Set(version.Commit())
monitoring.NewTimestamp(infoRegistry, "build_time").Set(version.BuildTime())
monitoring.NewBool(infoRegistry, "elastic_licensed").Set(b.Info.ElasticLicensed)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "version").Set(b.Info.Version)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "beat").Set(b.Info.Beat)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "name").Set(b.Info.Name)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "uuid").Set(b.Info.ID.String())
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "ephemeral_id").Set(b.Info.EphemeralID.String())
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "binary_arch").Set(runtime.GOARCH)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "build_commit").Set(version.Commit())
monitoring.NewTimestamp(b.Info.Monitoring.InfoRegistry, "build_time").Set(version.BuildTime())
monitoring.NewBool(b.Info.Monitoring.InfoRegistry, "elastic_licensed").Set(b.Info.ElasticLicensed)

// Add user metadata data asynchronously (on Windows the lookup can take up to 60s).
go func() {
if u, err := user.Current(); err != nil {
// This usually happens if the user UID does not exist in /etc/passwd. It might be the case on K8S
// if the user set securityContext.runAsUser to an arbitrary value.
monitoring.NewString(infoRegistry, "uid").Set(strconv.Itoa(os.Getuid()))
monitoring.NewString(infoRegistry, "gid").Set(strconv.Itoa(os.Getgid()))
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "uid").Set(strconv.Itoa(os.Getuid()))
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "gid").Set(strconv.Itoa(os.Getgid()))
} else {
monitoring.NewString(infoRegistry, "username").Set(u.Username)
monitoring.NewString(infoRegistry, "uid").Set(u.Uid)
monitoring.NewString(infoRegistry, "gid").Set(u.Gid)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "username").Set(u.Username)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "uid").Set(u.Uid)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "gid").Set(u.Gid)
}
}()

stateRegistry := monitoring.GetNamespace("state").GetRegistry()

// state.service
serviceRegistry := stateRegistry.NewRegistry("service")
serviceRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("service")
monitoring.NewString(serviceRegistry, "version").Set(b.Info.Version)
monitoring.NewString(serviceRegistry, "name").Set(b.Info.Beat)
monitoring.NewString(serviceRegistry, "id").Set(b.Info.ID.String())

// state.beat
beatRegistry := stateRegistry.NewRegistry("beat")
beatRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("beat")
monitoring.NewString(beatRegistry, "name").Set(b.Info.Name)
}

func (b *Beat) RegisterHostname(useFQDN bool) {
hostname := b.Info.FQDNAwareHostname(useFQDN)

// info.hostname
var infoRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info")
if infoRegistry == nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info")
}
} else {
infoRegistry = monitoring.GetNamespace("info").GetRegistry()
}
monitoring.NewString(infoRegistry, "hostname").Set(hostname)
monitoring.NewString(b.Info.Monitoring.InfoRegistry, "hostname").Set(hostname)

// state.host
var stateRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state")
if stateRegistry == nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state")
}
} else {
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report)
monitoring.NewFunc(b.Info.Monitoring.StateRegistry, "host", host.ReportInfo(hostname), monitoring.Report)
}

// TestConfig check all settings are ok and the beat can be run
Expand Down Expand Up @@ -971,6 +957,30 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
}())
}

func (b *Beat) SetupRegistry() {
var infoRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info")
if infoRegistry == nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info")
}
} else {
infoRegistry = monitoring.GetNamespace("info").GetRegistry()
}
b.Info.Monitoring.InfoRegistry = infoRegistry

var stateRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state")
if stateRegistry == nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state")
}
} else {
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
b.Info.Monitoring.StateRegistry = stateRegistry
}

// handleFlags converts -flag to --flags, parses the command line
// flags, and it invokes the HandleFlags callback if implemented by
// the Beat.
Expand All @@ -992,6 +1002,8 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error loading config file: %w", err)
}

b.SetupRegistry()

if err := initPaths(cfg); err != nil {
return err
}
Expand Down Expand Up @@ -1515,8 +1527,7 @@ func (b *Beat) registerClusterUUIDFetching() {

// Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring
func (b *Beat) clusterUUIDFetchingCallback() elasticsearch.ConnectCallback {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
elasticsearchRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("outputs.elasticsearch")
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

callback := func(esClient *eslegclient.Connection) error {
Expand Down Expand Up @@ -1553,8 +1564,7 @@ func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) {

// Expose monitoring.cluster_uuid in state API
if monitoringClusterUUID != "" {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
monitoringRegistry := stateRegistry.NewRegistry("monitoring")
monitoringRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("monitoring")
clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid")
clusterUUIDRegVar.Set(monitoringClusterUUID)
}
Expand Down

0 comments on commit 61aadfe

Please sign in to comment.