Skip to content

Commit

Permalink
STAC-21209: add database components, skip virtual_node connections
Browse files Browse the repository at this point in the history
  • Loading branch information
fvlankvelt committed Apr 30, 2024
1 parent bc0b0fe commit 394bfbe
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 20 deletions.
5 changes: 5 additions & 0 deletions exporter/ststopologyexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (t *topologyExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metric
continue
}
connAttrs := m.Sum().DataPoints().At(0).Attributes()
connectionType, ok := connAttrs.Get("connection_type")
if !ok || connectionType.AsString() == "virtual_node" {
continue
}

client_api_key_value, client_key_exists := connAttrs.Get("client_sts_api_key")
var client_api_key string
if client_key_exists {
Expand Down
45 changes: 44 additions & 1 deletion exporter/ststopologyexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,40 @@ func TestExporter_pushResourcesData(t *testing.T) {
err := json.NewDecoder(req.Body).Decode(&payload)
require.NoError(t, err)
require.Equal(t, 1, len(payload.Topologies))

require.Equal(t, internal.Instance{
Type: "opentelemetry",
URL: "collector",
}, payload.Topologies[0].Instance)

require.Equal(t, 3, len(payload.Topologies[0].Components))
for _, component := range payload.Topologies[0].Components {
tags := component.Data.Tags
_, ok := tags["sts_api_key"]
require.False(t, ok)
}
require.Equal(t, 2, len(payload.Topologies[0].Relations))
for _, relation := range payload.Topologies[0].Relations {
tags := relation.Data.Tags
_, ok := tags["sts_api_key"]
require.False(t, ok)
}
res.WriteHeader(200)
}))
exporter := newTestExporter(t, testServer.URL)
err := exporter.ConsumeMetrics(context.TODO(), simpleMetrics())
require.NoError(t, err)
}

func TestExporter_skipVirtualNodes(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
require.Fail(t, "No request should be sent")
}))
exporter := newTestExporter(t, testServer.URL)
err := exporter.ConsumeMetrics(context.TODO(), virtualNodeMetrics())
require.NoError(t, err)
}

// simpleMetrics there will be added two ResourceMetrics and each of them have count data point
func simpleMetrics() pmetric.Metrics {
metrics := pmetric.NewMetrics()
Expand Down Expand Up @@ -59,7 +84,25 @@ func simpleMetrics() pmetric.Metrics {
ma.PutStr("client_service.namespace", "clientns")
ma.PutStr("server", "server")
ma.PutStr("server_service.namespace", "serverns")
ma.PutStr("connection_type", "unknown")
ma.PutStr("connection_type", "")
return metrics
}

func virtualNodeMetrics() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sc := rm.ScopeMetrics().AppendEmpty()
sc.Scope().SetName("traces_service_graph")
ms := sc.Metrics().AppendEmpty()
ms.SetName("traces_service_graph_request_total")
ms.SetEmptySum().SetIsMonotonic(true)
ma := ms.Sum().DataPoints().AppendEmpty().Attributes()
ma.PutStr("client_sts_api_key", "APIKEY")
ma.PutStr("client", "client")
ma.PutStr("client_service.namespace", "clientns")
ma.PutStr("server", "server")
ma.PutStr("server_service.namespace", "serverns")
ma.PutStr("connection_type", "virtual_node")
return metrics
}

Expand Down
76 changes: 57 additions & 19 deletions exporter/ststopologyexporter/internal/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (

type ComponentsCollection struct {
namespaces map[string]*Component
services []*Component
serviceInstances []*Component
services map[string]*Component
serviceInstances map[string]*Component
relations map[string]*Relation
}

func NewCollection() *ComponentsCollection {
return &ComponentsCollection{
make(map[string]*Component, 0),
make([]*Component, 0),
make([]*Component, 0),
make(map[string]*Component, 0),
make(map[string]*Component, 0),
make(map[string]*Relation, 0),
}
}
Expand Down Expand Up @@ -53,7 +53,7 @@ func (c *ComponentsCollection) AddResource(attrs *pcommon.Map) bool {
}

serviceIdentifier := fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s", serviceNamespace.AsString(), serviceName.AsString())
c.services = append(c.services, &Component{
c.services[serviceIdentifier] = &Component{
serviceIdentifier,
ComponentType{
"service",
Expand All @@ -65,9 +65,9 @@ func (c *ComponentsCollection) AddResource(attrs *pcommon.Map) bool {
withVersion(attrs, "service.version").
withTag(attrs, "service.namespace").
withTagPrefix(attrs, "telemetry.sdk"),
})
}
serviceInstanceIdentifier := fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s:serviceInstance/%s", serviceNamespace.AsString(), serviceName.AsString(), serviceInstanceId.AsString())
c.serviceInstances = append(c.serviceInstances, &Component{
c.serviceInstances[serviceInstanceIdentifier] = &Component{
serviceInstanceIdentifier,
ComponentType{
"service-instance",
Expand All @@ -79,18 +79,17 @@ func (c *ComponentsCollection) AddResource(attrs *pcommon.Map) bool {
withVersion(attrs, "service.version").
withTag(attrs, "service.namespace").
withTags(attrs),
})
}
c.addRelation(serviceIdentifier, serviceInstanceIdentifier, "provided by")
return true
}

func (c *ComponentsCollection) AddConnection(attrs *pcommon.Map) bool {
reqAttrs := make(map[string]string, 4)
reqAttrs := make(map[string]string, 3)
for _, key := range []string{
"client",
"client_service.namespace",
"server",
"server_service.namespace",
"connection_type",
} {
value, ok := attrs.Get(key)
Expand All @@ -100,6 +99,17 @@ func (c *ComponentsCollection) AddConnection(attrs *pcommon.Map) bool {
reqAttrs[key] = value.AsString()
}

var connectionType string
if reqAttrs["connection_type"] == "" {
connectionType = "synchronous"
} else if reqAttrs["connection_type"] == "messaging_system" {
connectionType = "asynchronous"
} else if reqAttrs["connection_type"] == "database" {
connectionType = "database"
} else {
return false
}

instanceId, ok := attrs.Get("client_service.instance.id")
var clientInstanceId string
if !ok {
Expand All @@ -109,15 +119,35 @@ func (c *ComponentsCollection) AddConnection(attrs *pcommon.Map) bool {
}
sourceId := fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s:serviceInstance/%s", reqAttrs["client_service.namespace"], reqAttrs["client"], clientInstanceId)

instanceId, ok = attrs.Get("server_service.instance.id")
var serverInstanceId string
if !ok {
serverInstanceId = reqAttrs["server"]
var targetId string
if connectionType == "database" {
targetId = fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s:database/%s", reqAttrs["client_service.namespace"], reqAttrs["client"], reqAttrs["server"])
c.serviceInstances[targetId] = &Component{
targetId,
ComponentType{
"database",
},
newComponentData().
withLayer("urn:stackpack:common:layer:databases").
withName(attrs, "server"),
}
} else {
serverInstanceId = instanceId.AsString()
serverNamespace, ok := attrs.Get("server_service.namespace")
if !ok {
return false
}
instanceId, ok := attrs.Get("server_service.instance.id")
var serverInstanceId string
if !ok {
serverInstanceId = reqAttrs["server"]
} else {
serverInstanceId = instanceId.AsString()
}
targetId = fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s:serviceInstance/%s", serverNamespace.AsString(), reqAttrs["server"], serverInstanceId)
}
targetId := fmt.Sprintf("urn:opentelemetry:namespace/%s:service/%s:serviceInstance/%s", reqAttrs["server_service.namespace"], reqAttrs["server"], serverInstanceId)
c.addRelation(sourceId, targetId, reqAttrs["connection_type"])

c.addRelation(sourceId, targetId, connectionType)

return true
}

Expand All @@ -139,10 +169,18 @@ func (c *ComponentsCollection) GetComponents() []*Component {
for _, namespace := range c.namespaces {
namespaces = append(namespaces, namespace)
}
services := make([]*Component, 0, len(c.services))
for _, service := range c.services {
services = append(services, service)
}
instances := make([]*Component, 0, len(c.serviceInstances))
for _, instance := range c.serviceInstances {
instances = append(instances, instance)
}
return append(
append(
c.services,
c.serviceInstances...,
services,
instances...,
),
namespaces...,
)
Expand Down
119 changes: 119 additions & 0 deletions exporter/ststopologyexporter/internal/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,122 @@ func TestTopology_addResource(t *testing.T) {
},
}, relations)
}

func TestTopology_addSynchronousConnection(t *testing.T) {
collection := NewCollection()
attrs := pcommon.NewMap()
attrs.PutStr("client", "frontend")
attrs.PutStr("client_service.namespace", "ns")
attrs.PutStr("server", "backend")
attrs.PutStr("server_service.namespace", "ns")
attrs.PutStr("connection_type", "")
ok := collection.AddConnection(&attrs)
require.True(t, ok)

relations := collection.GetRelations()
require.Equal(t, []*Relation{
{
ExternalId: "urn:opentelemetry:namespace/ns:service/frontend:serviceInstance/frontend-urn:opentelemetry:namespace/ns:service/backend:serviceInstance/backend",
SourceId: "urn:opentelemetry:namespace/ns:service/frontend:serviceInstance/frontend",
TargetId: "urn:opentelemetry:namespace/ns:service/backend:serviceInstance/backend",
Type: RelationType{
Name: "synchronous",
},
Data: &RelationData{
Tags: map[string]string{},
},
},
}, relations)
}

func TestTopology_failIncompleteConnection(t *testing.T) {
collection := NewCollection()
attrs := pcommon.NewMap()
attrs.PutStr("client", "frontend")
attrs.PutStr("client_service.namespace", "ns")
attrs.PutStr("server", "backend")
attrs.PutStr("connection_type", "")
ok := collection.AddConnection(&attrs)
require.False(t, ok)

attrs.Remove("client_service.namespace")
attrs.PutStr("server_service.namespace", "ns")
ok = collection.AddConnection(&attrs)
require.False(t, ok)

attrs.Remove("client")
attrs.PutStr("client_service.namespace", "ns")
ok = collection.AddConnection(&attrs)
require.False(t, ok)
}

func TestTopology_addAsynchronousConnection(t *testing.T) {
collection := NewCollection()
attrs := pcommon.NewMap()
attrs.PutStr("client", "frontend")
attrs.PutStr("client_service.namespace", "ns")
attrs.PutStr("server", "backend")
attrs.PutStr("server_service.namespace", "ns")
attrs.PutStr("connection_type", "messaging_system")
ok := collection.AddConnection(&attrs)
require.True(t, ok)

relations := collection.GetRelations()
require.Equal(t, []*Relation{
{
ExternalId: "urn:opentelemetry:namespace/ns:service/frontend:serviceInstance/frontend-urn:opentelemetry:namespace/ns:service/backend:serviceInstance/backend",
SourceId: "urn:opentelemetry:namespace/ns:service/frontend:serviceInstance/frontend",
TargetId: "urn:opentelemetry:namespace/ns:service/backend:serviceInstance/backend",
Type: RelationType{
Name: "asynchronous",
},
Data: &RelationData{
Tags: map[string]string{},
},
},
}, relations)
}

func TestTopology_addDatabase(t *testing.T) {
collection := NewCollection()
attrs := pcommon.NewMap()
attrs.PutStr("client", "frontend")
attrs.PutStr("client_service.namespace", "ns")
attrs.PutStr("server", "mydb")
attrs.PutStr("connection_type", "database")
ok := collection.AddConnection(&attrs)
require.True(t, ok)

relations := collection.GetRelations()
require.Equal(t, []*Relation{
{
ExternalId: "urn:opentelemetry:namespace/ns:service/frontend:serviceInstance/frontend-urn:opentelemetry:namespace/ns:service/frontend:database/mydb",
SourceId: "urn:opentelemetry:namespace/ns:service/frontend:serviceInstance/frontend",
TargetId: "urn:opentelemetry:namespace/ns:service/frontend:database/mydb",
Type: RelationType{
Name: "database",
},
Data: &RelationData{
Tags: map[string]string{},
},
},
}, relations)

components := collection.GetComponents()
require.Equal(t, []*Component{
{
ExternalId: "urn:opentelemetry:namespace/ns:service/frontend:database/mydb",
Type: ComponentType{
Name: "database",
},
Data: &ComponentData{
Name: "mydb",
Version: "",
Layer: "urn:stackpack:common:layer:databases",
Domain: "",
Environment: "",
Tags: map[string]string{},
},
},
}, components)
}

0 comments on commit 394bfbe

Please sign in to comment.