diff --git a/CHANGELOG.md b/CHANGELOG.md index e227128..d2a625b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +- 0.4.4-1 (2025-01-31): + + Fixed major I/O performance issue when writing metrics to DuckDB. + - 0.4.3-1 (2025-01-30): + Fixed zoom reset in charts. diff --git a/Makefile b/Makefile index 6394be5..64ca6f6 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ SHELL := /bin/bash ROOT := $(abspath $(dir $(lastword $(MAKEFILE_LIST)))) UMASK := 022 -VERSION := 0.4.3 +VERSION := 0.4.4 ITERATION := 1 REVISION := $(shell cd '$(ROOT)' && git rev-parse --short HEAD) ENVIRONMENT ?= production diff --git a/docker-compose.yml b/docker-compose.yml index 330694a..c848a56 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,6 +49,21 @@ # # $ docker compose down --volumes --remove-orphans # +# $ IMAGE=$(docker build \ +# --file extras/github/docker/Dockerfile-jammy \ +# --build-arg GO_VERSION=1.23.3 \ +# .) +# $ docker run \ +# --rm \ +# --volume .:/workspace \ +# --workdir /workspace +# $IMAGE \ +# bash -c '\ +# git config --global --add safe.directory /workspace; \ +# export PATH=/usr/local/go/bin:$PATH; \ +# export PLATFORM=jammy; \ +# make package' +# name: varnishmon diff --git a/pkg/workers/archiver.go b/pkg/workers/archiver.go index e749162..0754323 100644 --- a/pkg/workers/archiver.go +++ b/pkg/workers/archiver.go @@ -2,11 +2,9 @@ package workers import ( "context" - "errors" "sync" "time" - duckdb "github.com/marcboeker/go-duckdb" "github.com/prometheus/client_golang/prometheus" "gitlab.com/stone.code/assert" @@ -59,12 +57,12 @@ func NewArchiverWorker( pushCompleted: prometheus.NewCounter( prometheus.CounterOpts{ Name: "archiver_push_completed_total", - Help: "Successful pushes of metrics by the archiver worker", + Help: "Successful pushes of batches of samples by the archiver worker", }), pushFailed: prometheus.NewCounter( prometheus.CounterOpts{ Name: "archiver_push_failed_total", - Help: "Failed pushes of metrics by the archiver worker", + Help: "Failed pushes of batches of samples by the archiver worker", }), } @@ -96,6 +94,8 @@ func (aw *ArchiverWorker) run() { case <-aw.ctx.Done(): return case metrics := <-aw.metricsQueue: + batch := make([]*storage.MetricSample, 0, len(metrics.Items)) + for name, details := range metrics.Items { // Check if this is the first time seeing the metric. previousMetric, ok := aw.lastMetrics[name] @@ -115,9 +115,9 @@ func (aw *ArchiverWorker) run() { // Otherwise, the rate per second of an uptime would be // pretty much useless. - // Skip if this is the first time seeing the metric: counters - // are stored as rates calculated based on the previously seen - // value. + // Skip if this is the first time seeing the metric: + // counters are stored as rates calculated based on the + // previously seen value. if previousMetric == nil { continue } @@ -149,8 +149,8 @@ func (aw *ArchiverWorker) run() { continue } - // Transform the 'uint64' value of the metric by dropping the - // highest bit. See: https://github.com/golang/go/issues/6113. + // Transform the 'uint64' value of the metric by dropping + // the highest bit. See: https://github.com/golang/go/issues/6113. if !details.IsBitmap() && details.Value&0x8000000000000000 != 0 { aw.truncatedSamples.Inc() } @@ -166,37 +166,25 @@ func (aw *ArchiverWorker) run() { previousMetric.value = details.Value } - // Store the metric. - if err := aw.storage.PushMetricSample( - name, metrics.Timestamp, details.Flag, details.Format, - details.Description, value); err != nil { - aw.pushFailed.Inc() - - aw.app.Cfg().Log().Error(). - Err(err). - Str("name", name). - Str("flag", details.Flag). - Str("format", details.Format). - Interface("value", value). - Msg("Failed to store metric!") - - // On DuckDB errors, discard remaining metrics. Typically, - // when DuckDB fails to store a metric, it indicates a - // permanent issue (e.g., memory allocation failure). It is - // better to stop early to avoid flooding the logs with one - // error entry for each metric in the batch an to prevent - // further damage like a CPU spike. - var duckdbErr *duckdb.Error - if errors.As(err, &duckdbErr) { - aw.app.Cfg().Log().Error(). - Interface("type", duckdbErr.Type). - Interface("msg", duckdbErr.Msg). - Msg("Hitting a DuckDB error, stopping further processing of current batch of metrics!") - break - } - } else { - aw.pushCompleted.Inc() - } + // Append the metric sample to the batch. + batch = append(batch, &storage.MetricSample{ + Name: name, + Flag: details.Flag, + Format: details.Format, + Description: details.Description, + Value: value, + }) + } + + if err := aw.storage.PushMetricSamples(metrics.Timestamp, batch); err != nil { + aw.pushFailed.Inc() + aw.app.Cfg().Log().Error(). + Err(err). + Time("timestamp", metrics.Timestamp). + Int("count", len(batch)). + Msg("Failed to store batch of samples!") + } else { + aw.pushCompleted.Inc() } } } diff --git a/pkg/workers/storage/init.go b/pkg/workers/storage/init.go index a99d5d6..40475d5 100644 --- a/pkg/workers/storage/init.go +++ b/pkg/workers/storage/init.go @@ -94,7 +94,8 @@ func (stg *Storage) unsafeConfigureDB() { SET memory_limit = '%dMiB'; SET threads = '%d'; SET temp_directory = '%s'; - SET max_temp_directory_size = '%dMiB';`, + SET max_temp_directory_size = '%dMiB'; + SET preserve_insertion_order = false;`, stg.app.Cfg().DBMemoryLimit(), stg.app.Cfg().DBThreads(), stg.app.Cfg().DBTempDirectory(), diff --git a/pkg/workers/storage/main.go b/pkg/workers/storage/main.go index 3cf5bf0..f9f14b1 100644 --- a/pkg/workers/storage/main.go +++ b/pkg/workers/storage/main.go @@ -57,6 +57,13 @@ type CachedMetric struct { Description string Class string } +type MetricSample struct { + Name string + Flag string + Format string + Description string + Value interface{} +} func NewStorage(app Application) *Storage { // Create instance. diff --git a/pkg/workers/storage/metrics.go b/pkg/workers/storage/metrics.go index 433e54c..5bce8c7 100644 --- a/pkg/workers/storage/metrics.go +++ b/pkg/workers/storage/metrics.go @@ -186,40 +186,67 @@ func (stg *Storage) GetMetric( }, nil } -func (stg *Storage) PushMetricSample( - name string, timestamp time.Time, flag, format, description string, - value any) error { - // Check if the metric is known and identical to the one in the database. - // Non identical metrics will preserve their internal ID, but the rest of - // the fields will be updated. - var metric *CachedMetric - stg.cache.mutex.RLock() - if m := stg.cache.metricsByName[name]; m != nil { - if m.Flag == flag && m.Format == format && m.Description == description { - metric = m - } - } - stg.cache.mutex.RUnlock() - +func (stg *Storage) PushMetricSamples(timestamp time.Time, samples []*MetricSample) error { // This is a write operation on 'db' but a read lock is intentionally used. // See the note on the 'Storage' type for more information. stg.mutex.RLock() defer stg.mutex.RUnlock() - // Insert / update in the 'metrics' table. - if metric == nil { - var class string - switch value.(type) { - case uint64: - class = "uint64" - case float64: - class = "float64" - default: - return ErrInvalidMetricType + // Using a transaction is crucial to batch the inserts and avoid performance + // penalties. + tx, err := stg.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() //nolint:errcheck + + // Build prepared statements for batch inserts, one for each metric class. + uint64Statement, err := tx.Prepare(` + INSERT INTO metric_values (metric_id, timestamp, value) + VALUES (?, ?, union_value(uint64 := ?))`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer uint64Statement.Close() + float64Statement, err := tx.Prepare(` + INSERT INTO metric_values (metric_id, timestamp, value) + VALUES (?, ?, union_value(float64 := ?))`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer float64Statement.Close() + + // Prepare batched inserts into the 'metric_values' table. During this + // process, insert or update in the 'metrics' table if necessary. + for _, sample := range samples { + // Check if the metric is known and identical to the one in the database. + // Non identical metrics will preserve their internal ID, but the rest + // of the fields will be updated. Beware of locking order: 'stg.mutex' + // was locked before 'stg.cache.mutex'. + var metric *CachedMetric + stg.cache.mutex.RLock() + if m := stg.cache.metricsByName[sample.Name]; m != nil { + if m.Flag == sample.Flag && m.Format == sample.Format && + m.Description == sample.Description { + metric = m + } } + stg.cache.mutex.RUnlock() - var metricID int - if err := stg.db.QueryRow(` + // Insert / update in the 'metrics' table. + if metric == nil { + var class string + switch sample.Value.(type) { + case uint64: + class = "uint64" + case float64: + class = "float64" + default: + return ErrInvalidMetricType + } + + var metricID int + if err := stg.db.QueryRow(` INSERT INTO metrics (id, name, flag, format, description, class) VALUES ( COALESCE((SELECT id FROM metrics WHERE name = $1), NEXTVAL('metrics_seq')), @@ -229,35 +256,43 @@ func (stg *Storage) PushMetricSample( format = excluded.format, description = excluded.description RETURNING id`, - name, flag, format, description, class).Scan(&metricID); err != nil { - return fmt.Errorf("failed to insert / update into 'metrics' table: %w", err) + sample.Name, sample.Flag, sample.Format, sample.Description, class).Scan(&metricID); err != nil { + return fmt.Errorf("failed to insert / update into 'metrics' table: %w", err) + } + + metric = &CachedMetric{ + ID: metricID, + Name: sample.Name, + Flag: sample.Flag, + Format: sample.Format, + Description: sample.Description, + Class: class, + } + + // Beware of locking order: 'stg.mutex' was locked before + // 'stg.cache.mutex'. + stg.cache.mutex.Lock() + stg.cache.metricsByID[metric.ID] = metric + stg.cache.metricsByName[metric.Name] = metric + stg.cache.mutex.Unlock() } - metric = &CachedMetric{ - ID: metricID, - Name: name, - Flag: flag, - Format: format, - Description: description, - Class: class, + // Insert sample in the 'metric_values' table. + switch metric.Class { + case "uint64": + if _, err := uint64Statement.Exec(metric.ID, timestamp, sample.Value); err != nil { + return fmt.Errorf("failed to insert into 'metric_values' table: %w", err) + } + case "float64": + if _, err := float64Statement.Exec(metric.ID, timestamp, sample.Value); err != nil { + return fmt.Errorf("failed to insert into 'metric_values' table: %w", err) + } } - - // Beware of locking order: 'stg.mutex' was locked before - // 'stg.cache.mutex'. - stg.cache.mutex.Lock() - stg.cache.metricsByID[metric.ID] = metric - stg.cache.metricsByName[metric.Name] = metric - stg.cache.mutex.Unlock() } - // Insert in the 'metric_values' table. Beware using 'metric' outside the - // lock is safe. - //nolint:gosec - if _, err := stg.db.Exec(` - INSERT INTO metric_values (metric_id, timestamp, value) - VALUES (?, ?, union_value(`+metric.Class+` := ?))`, - metric.ID, timestamp, value); err != nil { - return fmt.Errorf("failed to insert into 'metric_values' table: %w", err) + // Commit transaction. + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) } // Update 'earliest' and 'latest' cache values. Beware of locking order: