Skip to content

Commit

Permalink
Merge branch 'dev' (0.4.4-1)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosabalde committed Jan 31, 2025
2 parents 82b94b8 + df2d96c commit 65ab73a
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 93 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- 0.4.4-1 (2025-01-31):
+ Fixed major I/O performance issue when writing metrics to DuckDB.

- 0.4.3-1 (2025-01-30):
+ Fixed zoom reset in charts.

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ SHELL := /bin/bash
ROOT := $(abspath $(dir $(lastword $(MAKEFILE_LIST))))
UMASK := 022

VERSION := 0.4.3
VERSION := 0.4.4
ITERATION := 1
REVISION := $(shell cd '$(ROOT)' && git rev-parse --short HEAD)
ENVIRONMENT ?= production
Expand Down
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@
#
# $ docker compose down --volumes --remove-orphans
#
# $ IMAGE=$(docker build \
# --file extras/github/docker/Dockerfile-jammy \
# --build-arg GO_VERSION=1.23.3 \
# .)
# $ docker run \
# --rm \
# --volume .:/workspace \
# --workdir /workspace
# $IMAGE \
# bash -c '\
# git config --global --add safe.directory /workspace; \
# export PATH=/usr/local/go/bin:$PATH; \
# export PLATFORM=jammy; \
# make package'
#

name: varnishmon

Expand Down
68 changes: 28 additions & 40 deletions pkg/workers/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package workers

import (
"context"
"errors"
"sync"
"time"

duckdb "github.com/marcboeker/go-duckdb"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/stone.code/assert"

Expand Down Expand Up @@ -59,12 +57,12 @@ func NewArchiverWorker(
pushCompleted: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "archiver_push_completed_total",
Help: "Successful pushes of metrics by the archiver worker",
Help: "Successful pushes of batches of samples by the archiver worker",
}),
pushFailed: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "archiver_push_failed_total",
Help: "Failed pushes of metrics by the archiver worker",
Help: "Failed pushes of batches of samples by the archiver worker",
}),
}

Expand Down Expand Up @@ -96,6 +94,8 @@ func (aw *ArchiverWorker) run() {
case <-aw.ctx.Done():
return
case metrics := <-aw.metricsQueue:
batch := make([]*storage.MetricSample, 0, len(metrics.Items))

for name, details := range metrics.Items {
// Check if this is the first time seeing the metric.
previousMetric, ok := aw.lastMetrics[name]
Expand All @@ -115,9 +115,9 @@ func (aw *ArchiverWorker) run() {
// Otherwise, the rate per second of an uptime would be
// pretty much useless.

// Skip if this is the first time seeing the metric: counters
// are stored as rates calculated based on the previously seen
// value.
// Skip if this is the first time seeing the metric:
// counters are stored as rates calculated based on the
// previously seen value.
if previousMetric == nil {
continue
}
Expand Down Expand Up @@ -149,8 +149,8 @@ func (aw *ArchiverWorker) run() {
continue
}

// Transform the 'uint64' value of the metric by dropping the
// highest bit. See: https://github.com/golang/go/issues/6113.
// Transform the 'uint64' value of the metric by dropping
// the highest bit. See: https://github.com/golang/go/issues/6113.
if !details.IsBitmap() && details.Value&0x8000000000000000 != 0 {
aw.truncatedSamples.Inc()
}
Expand All @@ -166,37 +166,25 @@ func (aw *ArchiverWorker) run() {
previousMetric.value = details.Value
}

// Store the metric.
if err := aw.storage.PushMetricSample(
name, metrics.Timestamp, details.Flag, details.Format,
details.Description, value); err != nil {
aw.pushFailed.Inc()

aw.app.Cfg().Log().Error().
Err(err).
Str("name", name).
Str("flag", details.Flag).
Str("format", details.Format).
Interface("value", value).
Msg("Failed to store metric!")

// On DuckDB errors, discard remaining metrics. Typically,
// when DuckDB fails to store a metric, it indicates a
// permanent issue (e.g., memory allocation failure). It is
// better to stop early to avoid flooding the logs with one
// error entry for each metric in the batch an to prevent
// further damage like a CPU spike.
var duckdbErr *duckdb.Error
if errors.As(err, &duckdbErr) {
aw.app.Cfg().Log().Error().
Interface("type", duckdbErr.Type).
Interface("msg", duckdbErr.Msg).
Msg("Hitting a DuckDB error, stopping further processing of current batch of metrics!")
break
}
} else {
aw.pushCompleted.Inc()
}
// Append the metric sample to the batch.
batch = append(batch, &storage.MetricSample{
Name: name,
Flag: details.Flag,
Format: details.Format,
Description: details.Description,
Value: value,
})
}

if err := aw.storage.PushMetricSamples(metrics.Timestamp, batch); err != nil {
aw.pushFailed.Inc()
aw.app.Cfg().Log().Error().
Err(err).
Time("timestamp", metrics.Timestamp).
Int("count", len(batch)).
Msg("Failed to store batch of samples!")
} else {
aw.pushCompleted.Inc()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/workers/storage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func (stg *Storage) unsafeConfigureDB() {
SET memory_limit = '%dMiB';
SET threads = '%d';
SET temp_directory = '%s';
SET max_temp_directory_size = '%dMiB';`,
SET max_temp_directory_size = '%dMiB';
SET preserve_insertion_order = false;`,
stg.app.Cfg().DBMemoryLimit(),
stg.app.Cfg().DBThreads(),
stg.app.Cfg().DBTempDirectory(),
Expand Down
7 changes: 7 additions & 0 deletions pkg/workers/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type CachedMetric struct {
Description string
Class string
}
type MetricSample struct {
Name string
Flag string
Format string
Description string
Value interface{}
}

func NewStorage(app Application) *Storage {
// Create instance.
Expand Down
137 changes: 86 additions & 51 deletions pkg/workers/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,40 +186,67 @@ func (stg *Storage) GetMetric(
}, nil
}

func (stg *Storage) PushMetricSample(
name string, timestamp time.Time, flag, format, description string,
value any) error {
// Check if the metric is known and identical to the one in the database.
// Non identical metrics will preserve their internal ID, but the rest of
// the fields will be updated.
var metric *CachedMetric
stg.cache.mutex.RLock()
if m := stg.cache.metricsByName[name]; m != nil {
if m.Flag == flag && m.Format == format && m.Description == description {
metric = m
}
}
stg.cache.mutex.RUnlock()

func (stg *Storage) PushMetricSamples(timestamp time.Time, samples []*MetricSample) error {
// This is a write operation on 'db' but a read lock is intentionally used.
// See the note on the 'Storage' type for more information.
stg.mutex.RLock()
defer stg.mutex.RUnlock()

// Insert / update in the 'metrics' table.
if metric == nil {
var class string
switch value.(type) {
case uint64:
class = "uint64"
case float64:
class = "float64"
default:
return ErrInvalidMetricType
// Using a transaction is crucial to batch the inserts and avoid performance
// penalties.
tx, err := stg.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() //nolint:errcheck

// Build prepared statements for batch inserts, one for each metric class.
uint64Statement, err := tx.Prepare(`
INSERT INTO metric_values (metric_id, timestamp, value)
VALUES (?, ?, union_value(uint64 := ?))`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer uint64Statement.Close()
float64Statement, err := tx.Prepare(`
INSERT INTO metric_values (metric_id, timestamp, value)
VALUES (?, ?, union_value(float64 := ?))`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer float64Statement.Close()

// Prepare batched inserts into the 'metric_values' table. During this
// process, insert or update in the 'metrics' table if necessary.
for _, sample := range samples {
// Check if the metric is known and identical to the one in the database.
// Non identical metrics will preserve their internal ID, but the rest
// of the fields will be updated. Beware of locking order: 'stg.mutex'
// was locked before 'stg.cache.mutex'.
var metric *CachedMetric
stg.cache.mutex.RLock()
if m := stg.cache.metricsByName[sample.Name]; m != nil {
if m.Flag == sample.Flag && m.Format == sample.Format &&
m.Description == sample.Description {
metric = m
}
}
stg.cache.mutex.RUnlock()

var metricID int
if err := stg.db.QueryRow(`
// Insert / update in the 'metrics' table.
if metric == nil {
var class string
switch sample.Value.(type) {
case uint64:
class = "uint64"
case float64:
class = "float64"
default:
return ErrInvalidMetricType
}

var metricID int
if err := stg.db.QueryRow(`
INSERT INTO metrics (id, name, flag, format, description, class)
VALUES (
COALESCE((SELECT id FROM metrics WHERE name = $1), NEXTVAL('metrics_seq')),
Expand All @@ -229,35 +256,43 @@ func (stg *Storage) PushMetricSample(
format = excluded.format,
description = excluded.description
RETURNING id`,
name, flag, format, description, class).Scan(&metricID); err != nil {
return fmt.Errorf("failed to insert / update into 'metrics' table: %w", err)
sample.Name, sample.Flag, sample.Format, sample.Description, class).Scan(&metricID); err != nil {
return fmt.Errorf("failed to insert / update into 'metrics' table: %w", err)
}

metric = &CachedMetric{
ID: metricID,
Name: sample.Name,
Flag: sample.Flag,
Format: sample.Format,
Description: sample.Description,
Class: class,
}

// Beware of locking order: 'stg.mutex' was locked before
// 'stg.cache.mutex'.
stg.cache.mutex.Lock()
stg.cache.metricsByID[metric.ID] = metric
stg.cache.metricsByName[metric.Name] = metric
stg.cache.mutex.Unlock()
}

metric = &CachedMetric{
ID: metricID,
Name: name,
Flag: flag,
Format: format,
Description: description,
Class: class,
// Insert sample in the 'metric_values' table.
switch metric.Class {
case "uint64":
if _, err := uint64Statement.Exec(metric.ID, timestamp, sample.Value); err != nil {
return fmt.Errorf("failed to insert into 'metric_values' table: %w", err)
}
case "float64":
if _, err := float64Statement.Exec(metric.ID, timestamp, sample.Value); err != nil {
return fmt.Errorf("failed to insert into 'metric_values' table: %w", err)
}
}

// Beware of locking order: 'stg.mutex' was locked before
// 'stg.cache.mutex'.
stg.cache.mutex.Lock()
stg.cache.metricsByID[metric.ID] = metric
stg.cache.metricsByName[metric.Name] = metric
stg.cache.mutex.Unlock()
}

// Insert in the 'metric_values' table. Beware using 'metric' outside the
// lock is safe.
//nolint:gosec
if _, err := stg.db.Exec(`
INSERT INTO metric_values (metric_id, timestamp, value)
VALUES (?, ?, union_value(`+metric.Class+` := ?))`,
metric.ID, timestamp, value); err != nil {
return fmt.Errorf("failed to insert into 'metric_values' table: %w", err)
// Commit transaction.
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

// Update 'earliest' and 'latest' cache values. Beware of locking order:
Expand Down

0 comments on commit 65ab73a

Please sign in to comment.