-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
STAC-21205: refactor, address review comments
- Loading branch information
1 parent
72505e5
commit 2a22f99
Showing
6 changed files
with
319 additions
and
175 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright StackState B.V. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
package ststopologyexporter | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/stackvista/sts-opentelemetry-collector/exporter/ststopologyexporter/internal" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
"go.uber.org/zap" | ||
) | ||
|
||
var ( | ||
errInternal = errors.New("internal error") | ||
) | ||
|
||
type topologyExporter struct { | ||
logger *zap.Logger | ||
httpClient http.Client | ||
cfg *Config | ||
} | ||
|
||
func newTopologyExporter(logger *zap.Logger, cfg component.Config) (*topologyExporter, error) { | ||
stsCfg, ok := cfg.(*Config) | ||
if !ok { | ||
return nil, fmt.Errorf("invalid config passed to stackstateexporter: %T", cfg) | ||
} | ||
httpClient := http.Client{ | ||
Timeout: 5 * time.Second, // TODO configure timeout | ||
} | ||
|
||
return &topologyExporter{logger: logger, httpClient: httpClient, cfg: stsCfg}, nil | ||
} | ||
func (t *topologyExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { | ||
log := t.logger | ||
|
||
componentsByApiKey := make(map[string]*internal.ComponentsCollection, 0) | ||
rms := md.ResourceMetrics() | ||
for i := 0; i < rms.Len(); i++ { | ||
rs := rms.At(i) | ||
resource := rs.Resource() | ||
sts_api_key_value, key_exists := resource.Attributes().Get("sts_api_key") | ||
if !key_exists { | ||
log.Warn("No sts_api_key attribute found on resource") | ||
continue | ||
} | ||
sts_api_key := sts_api_key_value.AsString() | ||
attrs := resource.Attributes() | ||
attrs.Remove("sts_api_key") | ||
collection, has_siblings := componentsByApiKey[sts_api_key] | ||
if !has_siblings { | ||
collection = internal.NewCollection() | ||
componentsByApiKey[sts_api_key] = collection | ||
} | ||
if !collection.AddResource(&attrs) { | ||
log.Warn("Skipping resource without necessary attributes") | ||
} | ||
} | ||
|
||
for apiKey, collection := range componentsByApiKey { | ||
components := collection.GetComponents() | ||
relations := collection.GetRelations() | ||
request := internal.IntakeTopology{ | ||
CollectionTimestamp: time.Now().UnixMilli(), | ||
InternalHostname: "sts-otel-collector", | ||
Topologies: []internal.Topology{{ | ||
Instance: internal.Instance{ | ||
Type: "opentelemetry", | ||
URL: "collector", | ||
}, | ||
Components: components, | ||
Relations: relations, | ||
}}, | ||
} | ||
jsonData, err := json.Marshal(request) | ||
if err != nil { | ||
log.Error("Can't encode api request to JSON", zap.Error(err)) | ||
return errInternal //it shouldn't happen, something is wrong with the implementation | ||
} | ||
|
||
req, err := http.NewRequest(http.MethodPost, t.cfg.Endpoint, bytes.NewReader(jsonData)) | ||
if err != nil { | ||
log.Error("Can't create topology intake request ", zap.Error(err)) | ||
return errInternal | ||
} | ||
req.Header.Add("Content-Type", "application/json") | ||
req.Header.Add("sts-api-key", apiKey) | ||
|
||
res, err := t.httpClient.Do(req) | ||
if err != nil { | ||
log.Error("Receiver endpoint returned an error ", zap.Error(err)) | ||
} | ||
|
||
if res.StatusCode == 403 { | ||
log.Error("API Key was not valid", zap.Error(err)) | ||
} | ||
log.Debug( | ||
fmt.Sprintf("Sent %d components for key ...%s (status %d)", | ||
len(components), | ||
apiKey[len(apiKey)-4:], | ||
res.StatusCode, | ||
), | ||
) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
package internal | ||
|
||
import ( | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
) | ||
|
||
type ComponentsCollection struct { | ||
namespaces map[string]*Component | ||
services []*Component | ||
serviceInstances []*Component | ||
} | ||
|
||
func NewCollection() *ComponentsCollection { | ||
return &ComponentsCollection{ | ||
make(map[string]*Component, 0), | ||
make([]*Component, 0), | ||
make([]*Component, 0), | ||
} | ||
} | ||
|
||
func (c *ComponentsCollection) AddResource(attrs *pcommon.Map) bool { | ||
serviceName, ok := attrs.Get("service.name") | ||
if !ok { | ||
return false | ||
} | ||
serviceNamespace, ok := attrs.Get("service.namespace") | ||
if !ok { | ||
return false | ||
} | ||
instanceId, ok := attrs.Get("service.instance.id") | ||
var serviceInstanceId pcommon.Value | ||
if !ok { | ||
serviceInstanceId = serviceName | ||
} else { | ||
serviceInstanceId = instanceId | ||
} | ||
|
||
if _, ok := c.namespaces[serviceNamespace.AsString()]; !ok { | ||
c.namespaces[serviceNamespace.AsString()] = &Component{ | ||
fmt.Sprintf("urn:opentelemetry:namespace/%s", serviceNamespace.AsString()), | ||
ComponentType{ | ||
"namespace", | ||
}, | ||
newData(). | ||
withLayer("urn:stackpack:common:layer:applications"). | ||
withEnvironment(attrs). | ||
withName(attrs, "service.namespace"), | ||
} | ||
} | ||
c.services = append(c.services, &Component{ | ||
fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s", serviceNamespace.AsString(), serviceName.AsString()), | ||
ComponentType{ | ||
"service", | ||
}, | ||
newData(). | ||
withLayer("urn:stackpack:common:layer:services"). | ||
withEnvironment(attrs). | ||
withName(attrs, "service.name"). | ||
withVersion(attrs, "service.version"). | ||
withTags(attrs, "telemetry.sdk"), | ||
}) | ||
c.serviceInstances = append(c.serviceInstances, &Component{ | ||
fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s:serviceInstance/%s", serviceNamespace.AsString(), serviceName.AsString(), serviceInstanceId.AsString()), | ||
ComponentType{ | ||
"service_instance", | ||
}, | ||
newData(). | ||
withLayer("urn:stackpack:common:layer:containers"). | ||
withEnvironment(attrs). | ||
withName(attrs, "service.instance.id"). | ||
withVersion(attrs, "service.version"). | ||
withTags(attrs, "telemetry.sdk"). | ||
withTags(attrs, "telemetry.distro"). | ||
withProperties(attrs), | ||
}) | ||
return true | ||
} | ||
|
||
func (c *ComponentsCollection) GetComponents() []*Component { | ||
namespaces := make([]*Component, 0, len(c.namespaces)) | ||
for _, namespace := range c.namespaces { | ||
namespaces = append(namespaces, namespace) | ||
} | ||
return append( | ||
append( | ||
c.services, | ||
c.serviceInstances..., | ||
), | ||
namespaces..., | ||
) | ||
} | ||
|
||
func (c *ComponentsCollection) GetRelations() []*Relation { | ||
return make([]*Relation, 0) | ||
} | ||
|
||
func newData() *ComponentData { | ||
return &ComponentData{ | ||
Name: "", | ||
Version: "", | ||
Layer: "", | ||
Domain: "", | ||
Environment: "", | ||
Labels: []string{}, | ||
Tags: map[string]string{}, | ||
Properties: map[string]string{}, | ||
} | ||
} | ||
|
||
func (c *ComponentData) withLayer(layer string) *ComponentData { | ||
c.Layer = layer | ||
return c | ||
} | ||
|
||
func (c *ComponentData) withName(attrs *pcommon.Map, key string) *ComponentData { | ||
value, ok := attrs.Get(key) | ||
if ok { | ||
c.Name = value.AsString() | ||
} | ||
return c | ||
} | ||
|
||
func (c *ComponentData) withVersion(attrs *pcommon.Map, key string) *ComponentData { | ||
value, ok := attrs.Get(key) | ||
if ok { | ||
c.Version = value.AsString() | ||
} | ||
return c | ||
} | ||
|
||
func (c *ComponentData) withEnvironment(attrs *pcommon.Map) *ComponentData { | ||
value, ok := attrs.Get("deployment.environment") | ||
if ok { | ||
c.Environment = value.AsString() | ||
} | ||
return c | ||
} | ||
func (c *ComponentData) withTags(attrs *pcommon.Map, prefix string) *ComponentData { | ||
attrs.Range(func(k string, v pcommon.Value) bool { | ||
if len(k) >= len(prefix) && k[:len(prefix)] == prefix { | ||
c.Tags[k] = v.AsString() | ||
} | ||
return true | ||
}) | ||
return c | ||
} | ||
|
||
func (c *ComponentData) withProperties(attrs *pcommon.Map) *ComponentData { | ||
m := make(map[string]string, attrs.Len()) | ||
attrs.Range(func(k string, v pcommon.Value) bool { | ||
m[k] = v.AsString() | ||
return true | ||
}) | ||
c.Properties = m | ||
return c | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package internal | ||
|
||
type Instance struct { | ||
Type string `json:"type"` | ||
URL string `json:"url"` | ||
} | ||
|
||
type ComponentType struct { | ||
Name string `json:"name"` | ||
} | ||
|
||
type ComponentData struct { | ||
Name string `json:"name"` | ||
Version string `json:"version"` | ||
Layer string `json:"layer"` | ||
Domain string `json:"domain"` | ||
Environment string `json:"environment"` | ||
Labels []string `json:"labels"` | ||
Tags map[string]string `json:"tags"` | ||
Properties map[string]string `json:"properties"` | ||
} | ||
|
||
type Component struct { | ||
ExternalId string `json:"externalId"` | ||
Type ComponentType `json:"type"` | ||
Data *ComponentData `json:"data"` | ||
} | ||
|
||
type Relation struct{} | ||
|
||
type Topology struct { | ||
Instance Instance `json:"instance"` | ||
Components []*Component `json:"components"` | ||
Relations []*Relation `json:"relations"` | ||
} | ||
|
||
type IntakeTopology struct { | ||
CollectionTimestamp int64 `json:"collection_timestamp"` | ||
InternalHostname string `json:"internalHostname"` | ||
Topologies []Topology `json:"topologies"` | ||
} |
Oops, something went wrong.