Skip to content

Commit

Permalink
STAC-21017 Rewrite to separate resource exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
rb3ckers committed Apr 16, 2024
1 parent 2acefbf commit 2555b25
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 141 deletions.
2 changes: 2 additions & 0 deletions exporter/clickhousestsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Config struct {
TTL time.Duration `mapstructure:"ttl"`
// Create the traces table on startup
CreateTracesTable bool `mapstructure:"create_traces_table"`
// Create the resources table on startup
CreateResourcesTable bool `mapstructure:"create_resources_table"`
}

const defaultDatabase = "default"
Expand Down
17 changes: 9 additions & 8 deletions exporter/clickhousestsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "full"),
expected: &Config{
Endpoint: defaultEndpoint,
Database: "otel",
Username: "foo",
Password: "bar",
TTL: 72 * time.Hour,
LogsTableName: "otel_logs",
TracesTableName: "otel_traces",
MetricsTableName: "otel_metrics",
Endpoint: defaultEndpoint,
Database: "otel",
Username: "foo",
Password: "bar",
TTL: 72 * time.Hour,
LogsTableName: "otel_logs",
TracesTableName: "otel_traces",
MetricsTableName: "otel_metrics",
ResourcesTableName: "otel_resources",
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 5 * time.Second,
},
Expand Down
151 changes: 151 additions & 0 deletions exporter/clickhousestsexporter/exporter_resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package clickhousestsexporter

import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/google/uuid"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"github.com/stackvista/sts-opentelemetry-collector/exporter/clickhousestsexporter/internal"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.uber.org/zap"
)

type resourcesExporter struct {
client *sql.DB
insertSQL string

logger *zap.Logger
cfg *Config
}

type resourceModel struct {
resourceRef uuid.UUID
serviceName string
attributes map[string]string
}

func newResourceModel(resource pcommon.Resource) (*resourceModel, error) {
var serviceName string
if v, ok := resource.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
resourceRef := pdatautil.MapHash(resource.Attributes())
refUUID, err := uuid.FromBytes(resourceRef[:])
if err != nil {
return nil, err
}

resAttr := attributesToMap(resource.Attributes())
return &resourceModel{
resourceRef: refUUID,
serviceName: serviceName,
attributes: resAttr,
}, nil
}

func newResourceExporter(logger *zap.Logger, cfg *Config) (*resourcesExporter, error) {
client, err := newClickhouseClient(cfg)
if err != nil {
return nil, err
}

return &resourcesExporter{
client: client,
logger: logger,

insertSQL: renderInsertResourcesSQL(cfg.ResourcesTableName),
cfg: cfg,
}, nil
}

// shutdown will shut down the exporter.
func (e *resourcesExporter) shutdown(_ context.Context) error {
if e.client != nil {
return e.client.Close()
}
return nil
}

func (e *resourcesExporter) start(ctx context.Context, _ component.Host) error {
if !e.cfg.CreateResourcesTable {
return nil
}

if err := createDatabase(ctx, e.cfg); err != nil {
return err
}

return createResourcesTable(ctx, e.cfg.TTLDays, e.cfg.TTL, e.cfg.ResourcesTableName, e.client)
}

func (e *resourcesExporter) InsertResources(ctx context.Context, resources []*resourceModel) error {
start := time.Now()

err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {

resourceStatement, err := tx.PrepareContext(ctx, e.insertSQL)
if err != nil {
return fmt.Errorf("PrepareContext Traces:%w", err)
}
defer func() {
_ = resourceStatement.Close()
}()

for _, resource := range resources {
_, err := resourceStatement.ExecContext(ctx,
time.Now(),
resource.resourceRef,
resource.serviceName,
resource.attributes,
)
if err != nil {
return err
}
}
return nil
})
duration := time.Since(start)
e.logger.Debug("insert resources", zap.Int("records", len(resources)),
zap.String("cost", duration.String()))

return err
}

const (
// language=ClickHouse SQL
createResourcesTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
ResourceRef UUID,
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
) ENGINE = ReplacingMergeTree
%s
ORDER BY (ResourceRef, toUnixTimestamp(Timestamp))
SETTINGS index_granularity=512, ttl_only_drop_parts = 1;
`
// language=ClickHouse SQL
insertResourcesSQLTemplate = `INSERT INTO %s (Timestamp, ResourceRef, ServiceName, ResourceAttributes) VALUES (?, ?, ?, ?)`
)

func createResourcesTable(ctx context.Context, ttlDays uint, ttl time.Duration, tableName string, db *sql.DB) error {
ttlExpr := internal.GenerateTTLExpr(ttlDays, ttl, "Timestamp")
if _, err := db.ExecContext(ctx, renderCreateResourcesTableSQL(ttlExpr, tableName)); err != nil {
return fmt.Errorf("exec create resources table sql: %w", err)
}
return nil
}

func renderInsertResourcesSQL(tableName string) string {
return fmt.Sprintf(strings.ReplaceAll(insertResourcesSQLTemplate, "'", "`"), tableName)
}

func renderCreateResourcesTableSQL(ttlExpr string, tableName string) string {
return fmt.Sprintf(createResourcesTableSQL, tableName, ttlExpr)
}
62 changes: 31 additions & 31 deletions exporter/clickhousestsexporter/exporter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ import (
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"github.com/stackvista/sts-opentelemetry-collector/exporter/clickhousestsexporter/internal"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.uber.org/zap"
)

type tracesExporter struct {
client *sql.DB
insertSQL string
client *sql.DB
insertSQL string
resourceExporter *resourcesExporter

logger *zap.Logger
cfg *Config
Expand All @@ -32,16 +31,25 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error)
if err != nil {
return nil, err
}
resourceExporter, err := newResourceExporter(logger, cfg)
if err != nil {
return nil, err
}

return &tracesExporter{
client: client,
insertSQL: renderInsertTracesSQL(cfg),
logger: logger,
cfg: cfg,
client: client,
insertSQL: renderInsertTracesSQL(cfg),
resourceExporter: resourceExporter,
logger: logger,
cfg: cfg,
}, nil
}

func (e *tracesExporter) start(ctx context.Context, _ component.Host) error {
func (e *tracesExporter) start(ctx context.Context, host component.Host) error {
if err := e.resourceExporter.start(ctx, host); err != nil {
return err
}

if !e.cfg.CreateTracesTable {
return nil
}
Expand All @@ -50,15 +58,13 @@ func (e *tracesExporter) start(ctx context.Context, _ component.Host) error {
return err
}

if err := internal.CreateResourcesTable(ctx, e.cfg.TTLDays, e.cfg.TTL, e.cfg.ResourcesTableName, e.client); err != nil {
return err
}

return createTracesTable(ctx, e.cfg, e.client)
}

// shutdown will shut down the exporter.
func (e *tracesExporter) shutdown(_ context.Context) error {
func (e *tracesExporter) shutdown(ctx context.Context) error {
e.resourceExporter.shutdown(ctx)

if e.client != nil {
return e.client.Close()
}
Expand All @@ -81,31 +87,24 @@ func getSpanParentType(r ptrace.Span) string {

func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
start := time.Now()
resources := []*resourceModel{}

err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
traceStatement, err := tx.PrepareContext(ctx, e.insertSQL)
if err != nil {
return fmt.Errorf("PrepareContext Traces:%w", err)
}
resourceWriter, err := internal.NewResourceWriter(e.logger, ctx, tx, e.cfg.ResourcesTableName)
if err != nil {
return fmt.Errorf("init resource writer:%w", err)
}
defer func() {
_ = traceStatement.Close()
_ = resourceWriter.Close()
}()
for i := 0; i < td.ResourceSpans().Len(); i++ {
spans := td.ResourceSpans().At(i)
res := spans.Resource()
resAttr := attributesToMap(res.Attributes())
var serviceName string
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
resourceRef := pdatautil.MapHash(res.Attributes())
if err := resourceWriter.InsertResource(resourceRef, serviceName, resAttr); err != nil {
return fmt.Errorf("ExecContext Resources:%w", err)
res, err := newResourceModel(spans.Resource())
if err != nil {
return err
}
resources = append(resources, res)

for j := 0; j < spans.ScopeSpans().Len(); j++ {
rs := spans.ScopeSpans().At(j).Spans()
scopeName := spans.ScopeSpans().At(j).Scope().Name()
Expand All @@ -119,14 +118,14 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
spanParentType := getSpanParentType(r)
_, err = traceStatement.ExecContext(ctx,
r.StartTimestamp().AsTime(),
resourceRef,
res.resourceRef,
TraceIDToHexOrEmptyString(r.TraceID()),
SpanIDToHexOrEmptyString(r.SpanID()),
SpanIDToHexOrEmptyString(r.ParentSpanID()),
r.TraceState().AsRaw(),
r.Name(),
SpanKindStr(r.Kind()),
serviceName,
res.serviceName,
scopeName,
scopeVersion,
spanAttr,
Expand All @@ -153,6 +152,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
duration := time.Since(start)
e.logger.Debug("insert traces", zap.Int("records", td.SpanCount()),
zap.String("cost", duration.String()))
e.resourceExporter.InsertResources(ctx, resources)
return err
}

Expand Down Expand Up @@ -193,7 +193,7 @@ const (
createTracesTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
ResourceRef UInt128,
ResourceRef UUID,
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
ParentSpanId String CODEC(ZSTD(1)),
Expand Down
23 changes: 12 additions & 11 deletions exporter/clickhousestsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ func createDefaultConfig() component.Config {
queueSettings.NumConsumers = 1

return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: queueSettings,
BackOffConfig: configretry.NewDefaultBackOffConfig(),
ConnectionParams: map[string]string{},
Database: defaultDatabase,
LogsTableName: "otel_logs",
TracesTableName: "otel_traces",
MetricsTableName: "otel_metrics",
ResourcesTableName: "otel_resources",
CreateTracesTable: true,
TTL: 0,
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: queueSettings,
BackOffConfig: configretry.NewDefaultBackOffConfig(),
ConnectionParams: map[string]string{},
Database: defaultDatabase,
LogsTableName: "otel_logs",
TracesTableName: "otel_traces",
MetricsTableName: "otel_metrics",
ResourcesTableName: "otel_resources",
CreateTracesTable: true,
CreateResourcesTable: true,
TTL: 0,
}
}

Expand Down
Loading

0 comments on commit 2555b25

Please sign in to comment.