Skip to content

Commit

Permalink
moved broker to kubefox spans
Browse files Browse the repository at this point in the history
  • Loading branch information
xadhatter committed Apr 1, 2024
1 parent 18284ab commit ebfc874
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 172 deletions.
81 changes: 34 additions & 47 deletions components/broker/engine/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import (
"github.com/xigxog/kubefox/api"
"github.com/xigxog/kubefox/build"
"github.com/xigxog/kubefox/components/broker/config"
"github.com/xigxog/kubefox/components/broker/telemetry"
brktel "github.com/xigxog/kubefox/components/broker/telemetry"
"github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/logkf"
"github.com/xigxog/kubefox/telemetry"
"github.com/xigxog/kubefox/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
authv1 "k8s.io/api/authentication/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -60,7 +58,8 @@ type Engine interface {
}

type Broker interface {
UploadTraces(context.Context, []*v1.ResourceSpans) error
AddSpans(*core.Component, ...*telemetry.Span)
AddResourceSpans([]*tracev1.ResourceSpans)
AuthorizeComponent(context.Context, *Metadata) error
Subscribe(context.Context, *SubscriptionConf) (ReplicaSubscription, error)
RecvEvent(evt *core.Event, receiver Receiver) *BrokerEventContext
Expand All @@ -76,8 +75,8 @@ type broker struct {
httpClient *HTTPClient
k8sClient client.Client

healthSrv *telemetry.HealthServer
telClient *telemetry.Client
healthSrv *brktel.HealthServer
telClient *brktel.Client

subMgr SubscriptionMgr
recvCh chan *BrokerEventContext
Expand Down Expand Up @@ -106,8 +105,8 @@ func New() Engine {
ctx, cancel := context.WithCancel(context.Background())
brk := &broker{
comp: comp,
healthSrv: telemetry.NewHealthServer(),
telClient: telemetry.NewClient(),
healthSrv: brktel.NewHealthServer(),
telClient: brktel.NewClient(),
subMgr: NewManager(),
recvCh: make(chan *BrokerEventContext),
store: NewStore(),
Expand Down Expand Up @@ -149,9 +148,9 @@ func (brk *broker) Start() {
}

// if config.TelemetryAddr != "false" {
if err := brk.telClient.Start(ctx, brk.comp); err != nil {
brk.shutdown(ExitCodeTelemetry, err)
}
// if err := brk.telClient.Start(ctx, brk.comp); err != nil {
// brk.shutdown(ExitCodeTelemetry, err)
// }
// }

if err := brk.natsClient.Connect(ctx); err != nil {
Expand Down Expand Up @@ -188,8 +187,12 @@ func (brk *broker) Start() {
brk.shutdown(0, nil)
}

func (brk *broker) UploadTraces(ctx context.Context, protoSpans []*v1.ResourceSpans) error {
return brk.telClient.UploadTraces(ctx, protoSpans)
func (brk *broker) AddResourceSpans(spans []*tracev1.ResourceSpans) {
brk.telClient.AddResourceSpans(spans)
}

func (brk *broker) AddSpans(comp *core.Component, spans ...*telemetry.Span) {
brk.telClient.AddSpans(comp, spans...)
}

func (brk *broker) Subscribe(ctx context.Context, conf *SubscriptionConf) (ReplicaSubscription, error) {
Expand Down Expand Up @@ -289,25 +292,13 @@ func (brk *broker) RecvEvent(evt *core.Event, receiver Receiver) *BrokerEventCon
parentCtx, cancel := context.WithCancelCause(context.Background())
ctx, _ := context.WithTimeoutCause(parentCtx, evt.TTL(), core.ErrTimeout())

// TODO move to kubefox telemetry and implements batch exporter.
if tp := evt.ParentSpan; tp != nil {
ts, _ := trace.ParseTraceState(tp.TraceState)
ctx = trace.ContextWithRemoteSpanContext(ctx, trace.NewSpanContext(
trace.SpanContextConfig{
TraceID: trace.TraceID(tp.TraceId),
SpanID: trace.SpanID(tp.SpanId),
TraceState: ts,
TraceFlags: trace.TraceFlags(tp.Flags),
},
))
}

evtCtx := &BrokerEventContext{
Context: ctx,
Cancel: cancel,
Event: evt,
Receiver: receiver,
ReceivedAt: time.Now(),
Span: telemetry.StartSpan("receive event", evt.ParentSpan),
}

go func() {
Expand Down Expand Up @@ -370,34 +361,29 @@ func (brk *broker) routeEvent(ctx *BrokerEventContext) (err error) {

ctx.Log.Debugf("routing event from receiver '%s'", ctx.Receiver)

var routeSpan trace.Span
ctx.Context, routeSpan = telemetry.Tracer.Start(ctx.Context,
routeSpan := ctx.Span.StartChildSpan(
fmt.Sprintf("route %s from %s", ctx.Event.Category, ctx.Event.Source.Key()),
trace.WithAttributes(
attribute.String("kubefox.event.id", ctx.Event.Id),
attribute.String("kubefox.source", ctx.Event.Source.Key()),
),
)
telemetry.Attr(telemetry.AttrKeyEventId, ctx.Event.Id),
telemetry.Attr(telemetry.AttrKeyEventSourceName, ctx.Event.Source.Key()))

defer routeSpan.End()

_, span := telemetry.Tracer.Start(ctx.Context, "find target")
findSpan := routeSpan.StartChildSpan("find target")
if err = brk.validateEvent(ctx); err == nil { //success
err = brk.findTarget(ctx)
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
findSpan.End(err)
return
}
span.End()
findSpan.End()

// Update log and span attributes after matching.
routeSpan.SetAttributes(attribute.String("kubefox.target", ctx.Event.Target.Key()))
routeSpan.SetAttributes(telemetry.Attr(telemetry.AttrKeyEventTargetName, ctx.Event.Target.Key()))
ctx.Log = ctx.Log.WithEvent(ctx.Event)
ctx.Log.Debugf("matched event to target '%s'", ctx.Event.Target)
ctx.Log.Debugf("matched event to target '%s'", ctx.Event.Target.Key())

_, span = telemetry.Tracer.Start(ctx.Context, fmt.Sprintf("send to %s", ctx.Event.Target.Key()))
sendSpan := routeSpan.StartChildSpan(fmt.Sprintf("send to %s", ctx.Event.Target.Key()))
if ctx.TargetAdapter != nil {
// TODO move http client to adapter
err = brk.httpClient.SendEvent(ctx)
Expand All @@ -420,12 +406,13 @@ func (brk *broker) routeEvent(ctx *BrokerEventContext) (err error) {
}
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
sendSpan.End(err)
return
}
span.End()
sendSpan.End()

ctx.Span.End()
brk.AddSpans(brk.comp, ctx.Span, routeSpan, findSpan, sendSpan)

return
}
Expand Down
9 changes: 8 additions & 1 deletion components/broker/engine/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,14 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub
}

func (srv *GRPCServer) Export(ctx context.Context, req *otelgrpc.ExportTraceServiceRequest) (*otelgrpc.ExportTraceServiceResponse, error) {
return &otelgrpc.ExportTraceServiceResponse{}, srv.brk.UploadTraces(ctx, req.ResourceSpans)
// TODO auth
if req == nil {
return &otelgrpc.ExportTraceServiceResponse{}, nil
}

srv.brk.AddResourceSpans(req.ResourceSpans)

return &otelgrpc.ExportTraceServiceResponse{}, nil
}

func parseMD(stream grpc.Broker_SubscribeServer) (*Metadata, error) {
Expand Down
2 changes: 2 additions & 0 deletions components/broker/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/xigxog/kubefox/api/kubernetes/v1alpha1"
"github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/logkf"
"github.com/xigxog/kubefox/telemetry"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down Expand Up @@ -52,6 +53,7 @@ type BrokerEventContext struct {

TargetAdapter common.Adapter

Span *telemetry.Span
Log *logkf.Logger
Cancel context.CancelCauseFunc

Expand Down
Loading

0 comments on commit ebfc874

Please sign in to comment.