From d27028ae9600043640e181a15b07c5175837f8c8 Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Tue, 16 Jan 2024 15:53:46 +0530 Subject: [PATCH] feat: job events metric push (#191) * feat: prometheus metric push * fix: push job name --- config/config_server.go | 5 +++-- core/scheduler/service/job_run_service.go | 14 ++++++++++---- go.mod | 2 +- internal/telemetry/prometheus.go | 22 ++++++++++++++++++++++ internal/telemetry/telemetry.go | 2 ++ 5 files changed, 38 insertions(+), 7 deletions(-) diff --git a/config/config_server.go b/config/config_server.go index 21f36319ac..d07769da0d 100644 --- a/config/config_server.go +++ b/config/config_server.go @@ -25,8 +25,9 @@ type DBConfig struct { } type TelemetryConfig struct { - ProfileAddr string `mapstructure:"profile_addr"` - JaegerAddr string `mapstructure:"jaeger_addr"` + ProfileAddr string `mapstructure:"profile_addr"` + JaegerAddr string `mapstructure:"jaeger_addr"` + MetricServerAddr string `mapstructure:"telegraf_addr"` } type ResourceManager struct { diff --git a/core/scheduler/service/job_run_service.go b/core/scheduler/service/job_run_service.go index 10e9e84e90..dff9516e7b 100644 --- a/core/scheduler/service/job_run_service.go +++ b/core/scheduler/service/job_run_service.go @@ -423,12 +423,15 @@ func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Ev s.l.Error("error updating job run sla status", err) return err } - telemetry.NewCounter(metricJobRunEvents, map[string]string{ + err = telemetry.SetGaugeViaPush(metricJobRunEvents, map[string]string{ "project": event.Tenant.ProjectName().String(), "namespace": event.Tenant.NamespaceName().String(), "name": event.JobName.String(), "status": scheduler.SLAMissEvent.String(), - }).Inc() + }, 1) + if err != nil { + s.l.Error("failed metric push", err) + } return nil } @@ -466,12 +469,15 @@ func (s *JobRunService) raiseJobRunStateChangeEvent(jobRun *scheduler.JobRun) { return } s.eventHandler.HandleEvent(schedulerEvent) - telemetry.NewCounter(metricJobRunEvents, map[string]string{ + err = telemetry.SetGaugeViaPush(metricJobRunEvents, map[string]string{ "project": jobRun.Tenant.ProjectName().String(), "namespace": jobRun.Tenant.NamespaceName().String(), "name": jobRun.JobName.String(), "status": jobRun.State.String(), - }).Inc() + }, 1) + if err != nil { + s.l.Error("failed metric push", err) + } } func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler.Event, operatorType scheduler.OperatorType) error { diff --git a/go.mod b/go.mod index dca109d8a6..e5e3d72358 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/mitchellh/mapstructure v1.4.3 github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.11.0 + github.com/prometheus/common v0.30.0 github.com/robfig/cron/v3 v3.0.1 github.com/schollz/progressbar/v3 v3.8.5 github.com/segmentio/kafka-go v0.4.39 @@ -136,7 +137,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect - github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect diff --git a/internal/telemetry/prometheus.go b/internal/telemetry/prometheus.go index 949b8da7ef..e14545849b 100644 --- a/internal/telemetry/prometheus.go +++ b/internal/telemetry/prometheus.go @@ -6,6 +6,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/push" + "github.com/prometheus/common/expfmt" ) var ( @@ -14,8 +16,12 @@ var ( gaugeMetricMap = map[string]prometheus.Gauge{} gaugeMetricMutex = sync.Mutex{} + + MetricServer string ) +const metricsPushJob = "optimus_push" + func getKey(metric string, labels map[string]string) string { eventMetricKey := metric keys := make([]string, 0, len(labels)) @@ -43,6 +49,22 @@ func NewCounter(metric string, labels map[string]string) prometheus.Counter { return newMetric } +func SetGaugeViaPush(name string, labels map[string]string, val float64) error { + if MetricServer == "" { + return nil + } + metric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: name, + ConstLabels: labels, + }) + metric.Set(val) + + return push.New(MetricServer, metricsPushJob). + Format(expfmt.FmtText). + Collector(metric). + Push() +} + func NewGauge(metric string, labels map[string]string) prometheus.Gauge { metricKey := getKey(metric, labels) diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 02b1821623..0ab0df6198 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -47,6 +47,8 @@ func Init(l log.Logger, conf config.TelemetryConfig) (func(), error) { otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) } + MetricServer = conf.MetricServerAddr + var metricServer *http.Server if conf.ProfileAddr != "" { l.Debug("enabling profile metrics", "addr", conf.ProfileAddr)