Skip to content

Commit

Permalink
feat: job events metric push (#191)
Browse files Browse the repository at this point in the history
* feat: prometheus metric push

* fix: push job name
  • Loading branch information
Mryashbhardwaj committed Jan 23, 2024
1 parent d778f70 commit d27028a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
5 changes: 3 additions & 2 deletions config/config_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type DBConfig struct {
}

type TelemetryConfig struct {
ProfileAddr string `mapstructure:"profile_addr"`
JaegerAddr string `mapstructure:"jaeger_addr"`
ProfileAddr string `mapstructure:"profile_addr"`
JaegerAddr string `mapstructure:"jaeger_addr"`
MetricServerAddr string `mapstructure:"telegraf_addr"`
}

type ResourceManager struct {
Expand Down
14 changes: 10 additions & 4 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,15 @@ func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Ev
s.l.Error("error updating job run sla status", err)
return err
}
telemetry.NewCounter(metricJobRunEvents, map[string]string{
err = telemetry.SetGaugeViaPush(metricJobRunEvents, map[string]string{
"project": event.Tenant.ProjectName().String(),
"namespace": event.Tenant.NamespaceName().String(),
"name": event.JobName.String(),
"status": scheduler.SLAMissEvent.String(),
}).Inc()
}, 1)
if err != nil {
s.l.Error("failed metric push", err)
}
return nil
}

Expand Down Expand Up @@ -466,12 +469,15 @@ func (s *JobRunService) raiseJobRunStateChangeEvent(jobRun *scheduler.JobRun) {
return
}
s.eventHandler.HandleEvent(schedulerEvent)
telemetry.NewCounter(metricJobRunEvents, map[string]string{
err = telemetry.SetGaugeViaPush(metricJobRunEvents, map[string]string{
"project": jobRun.Tenant.ProjectName().String(),
"namespace": jobRun.Tenant.NamespaceName().String(),
"name": jobRun.JobName.String(),
"status": jobRun.State.String(),
}).Inc()
}, 1)
if err != nil {
s.l.Error("failed metric push", err)
}
}

func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler.Event, operatorType scheduler.OperatorType) error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/mitchellh/mapstructure v1.4.3
github.com/olekukonko/tablewriter v0.0.5
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.30.0
github.com/robfig/cron/v3 v3.0.1
github.com/schollz/progressbar/v3 v3.8.5
github.com/segmentio/kafka-go v0.4.39
Expand Down Expand Up @@ -136,7 +137,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
Expand Down
22 changes: 22 additions & 0 deletions internal/telemetry/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/prometheus/common/expfmt"
)

var (
Expand All @@ -14,8 +16,12 @@ var (

gaugeMetricMap = map[string]prometheus.Gauge{}
gaugeMetricMutex = sync.Mutex{}

MetricServer string
)

const metricsPushJob = "optimus_push"

func getKey(metric string, labels map[string]string) string {
eventMetricKey := metric
keys := make([]string, 0, len(labels))
Expand Down Expand Up @@ -43,6 +49,22 @@ func NewCounter(metric string, labels map[string]string) prometheus.Counter {
return newMetric
}

func SetGaugeViaPush(name string, labels map[string]string, val float64) error {
if MetricServer == "" {
return nil
}
metric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: name,
ConstLabels: labels,
})
metric.Set(val)

return push.New(MetricServer, metricsPushJob).
Format(expfmt.FmtText).
Collector(metric).
Push()
}

func NewGauge(metric string, labels map[string]string) prometheus.Gauge {
metricKey := getKey(metric, labels)

Expand Down
2 changes: 2 additions & 0 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func Init(l log.Logger, conf config.TelemetryConfig) (func(), error) {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
}

MetricServer = conf.MetricServerAddr

var metricServer *http.Server
if conf.ProfileAddr != "" {
l.Debug("enabling profile metrics", "addr", conf.ProfileAddr)
Expand Down

0 comments on commit d27028a

Please sign in to comment.