Skip to content

Commit

Permalink
STAC-20729 Configure ReplacingMergeTree to run with replication
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaszMarchewka committed Oct 8, 2024
1 parent d29f7d8 commit 96a35aa
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
16 changes: 16 additions & 0 deletions exporter/clickhousestsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Config struct {
TTL time.Duration `mapstructure:"ttl"`
// TableEngine is the table engine to use. default is `MergeTree()`.
TableEngine TableEngine `mapstructure:"table_engine"`
// DeduplicatingTableEngine is the table engine to use that it removes duplicates entries with the same sorting key . default is `ReplacingMergeTree()`.
DeduplicatingTableEngine TableEngine `mapstructure:"deduplicating_table_engine"`
// ClusterName if set will append `ON CLUSTER` with the provided name when creating tables.
ClusterName string `mapstructure:"cluster_name"`
// Create the traces table on startup
Expand All @@ -63,6 +65,7 @@ type TableEngine struct {

const defaultDatabase = "default"
const defaultTableEngineName = "MergeTree"
const defaultDeduplicatingTableEngineName = "ReplacingMergeTree"

var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
Expand Down Expand Up @@ -166,6 +169,19 @@ func (cfg *Config) TableEngineString() string {
return fmt.Sprintf("%s(%s)", engine, params)
}

// DeduplicatingTableEngineString generates the ENGINE string that it removes duplicates entries with the same sorting key
func (cfg *Config) DeduplicatingTableEngineString() string {
engine := cfg.DeduplicatingTableEngine.Name
params := cfg.DeduplicatingTableEngine.Params

if cfg.DeduplicatingTableEngine.Name == "" {
engine = defaultDeduplicatingTableEngineName
params = ""
}

return fmt.Sprintf("%s(%s)", engine, params)
}

// ClusterString generates the ON CLUSTER string. Returns empty string if not set.
func (cfg *Config) ClusterString() string {
if cfg.ClusterName == "" {
Expand Down
16 changes: 8 additions & 8 deletions exporter/clickhousestsexporter/exporter_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (e *resourcesExporter) start(ctx context.Context, _ component.Host) error {
return err
}

return createResourcesTable(ctx, e.cfg.TTLDays, e.cfg.TTL, e.cfg.ResourcesTableName, e.client)
return createResourcesTable(ctx, e.cfg, e.client)
}

func (e *resourcesExporter) InsertResources(ctx context.Context, resources []*resourceModel) error {
Expand Down Expand Up @@ -112,11 +112,11 @@ func (e *resourcesExporter) InsertResources(ctx context.Context, resources []*re
const (
// language=ClickHouse SQL
createResourcesTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
CREATE TABLE IF NOT EXISTS %s %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
ResourceRef UUID,
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
) ENGINE = ReplacingMergeTree
) ENGINE = %s
%s
ORDER BY (ResourceRef, toUnixTimestamp(Timestamp))
SETTINGS index_granularity=512, ttl_only_drop_parts = 1;
Expand All @@ -125,9 +125,8 @@ SETTINGS index_granularity=512, ttl_only_drop_parts = 1;
insertResourcesSQLTemplate = `INSERT INTO %s (Timestamp, ResourceRef, ResourceAttributes) VALUES (?, ?, ?)`
)

func createResourcesTable(ctx context.Context, ttlDays uint, ttl time.Duration, tableName string, db *sql.DB) error {
ttlExpr := internal.GenerateTTLExpr(ttlDays, ttl, "Timestamp")
if _, err := db.ExecContext(ctx, renderCreateResourcesTableSQL(ttlExpr, tableName)); err != nil {
func createResourcesTable(ctx context.Context, cfg *Config, db *sql.DB) error {
if _, err := db.ExecContext(ctx, renderCreateResourcesTableSQL(cfg)); err != nil {
return fmt.Errorf("exec create resources table sql: %w", err)
}
return nil
Expand All @@ -137,6 +136,7 @@ func renderInsertResourcesSQL(tableName string) string {
return fmt.Sprintf(strings.ReplaceAll(insertResourcesSQLTemplate, "'", "`"), tableName)
}

func renderCreateResourcesTableSQL(ttlExpr string, tableName string) string {
return fmt.Sprintf(createResourcesTableSQL, tableName, ttlExpr)
func renderCreateResourcesTableSQL(cfg *Config) string {
ttlExpr := internal.GenerateTTLExpr(cfg.TTLDays, cfg.TTL, "Timestamp")
return fmt.Sprintf(createResourcesTableSQL, cfg.ResourcesTableName, cfg.ClusterString(), cfg.DeduplicatingTableEngineString(), ttlExpr)
}

0 comments on commit 96a35aa

Please sign in to comment.