Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#90): adds support for finished span handler. #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions span_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
type spanImpl struct {
mtx sync.RWMutex
model.SpanModel
tracer *Tracer
mustCollect int32 // used as atomic bool (1 = true, 0 = false)
flushOnFinish bool
tracer *Tracer
mustCollect int32 // used as atomic bool (1 = true, 0 = false)
flushOnFinish bool
finishedSpanHandler func(*model.SpanModel) bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi we have a list of these in java (ex one for metrics, one for dependency graph)

}

func (s *spanImpl) Context() model.SpanContext {
Expand Down Expand Up @@ -77,21 +78,83 @@ func (s *spanImpl) Tag(key, value string) {
}

func (s *spanImpl) Finish() {
d := time.Since(s.Timestamp)
if atomic.CompareAndSwapInt32(&s.mustCollect, 1, 0) {
s.Duration = time.Since(s.Timestamp)
if s.flushOnFinish {
s.mtx.Lock()
s.Duration = d
s.mtx.Unlock()

shouldRecord := true
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume flushOnFinish is related to sampling bit. if so, sg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that is the case. I think it is more for when you want to add more stuff after a span is finished. See https://github.com/openzipkin/zipkin-go/blob/master/span_options.go#L79

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. yeah I think this will want sampled local support before people start coding to it. Otherwise, later they might get a firehose and not notice. here's the docs from the java side.

/**
 * Triggered on each finished span except when spans that are {@link Span#isNoop() no-op}.
 *
 * <p>{@link TraceContext#sampled() Sampled spans} hit this stage before reporting to Zipkin.
 * This means changes to the mutable span will reflect in reported data.
 *
 * <p>When Zipkin's reporter is {@link zipkin2.reporter.Reporter#NOOP} or the context is
 * unsampled, this will still receive spans where {@link TraceContext#sampledLocal()} is true.
 *
 * @see #alwaysSampleLocal()
 */

https://github.com/openzipkin/brave/blob/5a17fc018613958db00cc7b6951826e95f1b9d6c/brave/src/main/java/brave/handler/FinishedSpanHandler.java#L30

s.tracer.reporter.Send(s.SpanModel)
}
return
}

var hasDuration bool
s.mtx.Lock()
hasDuration = s.Duration == 0
s.mtx.Unlock()

if hasDuration {
// it was not meant to be recorded because the CompareAndSwap
// did not happen (meaning that s.mustCollect is 0 at this moment)
// and duration is still zero value.
s.mtx.Lock()
s.Duration = d
s.mtx.Unlock()

shouldRecord := false
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
} // else the span is being finished concurrently by another goroutine
}

func (s *spanImpl) FinishedWithDuration(d time.Duration) {
if atomic.CompareAndSwapInt32(&s.mustCollect, 1, 0) {
s.mtx.Lock()
s.Duration = d
if s.flushOnFinish {
s.mtx.Unlock()

shouldRecord := true
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
return
}

var hasDuration bool
s.mtx.Lock()
hasDuration = s.Duration == 0
s.mtx.Unlock()

if hasDuration {
// it was not meant to be recorded because the CompareAndSwap
// did not happen (meaning that s.mustCollect is 0 at this moment)
// and duration is still zero value.
s.Duration = d
shouldRecord := false
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
} // else the span is being finished concurrently by another goroutine
}

func (s *spanImpl) Flush() {
Expand Down
6 changes: 4 additions & 2 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Tracer struct {
noop int32 // used as atomic bool (1 = true, 0 = false)
sharedSpans bool
unsampledNoop bool
finishedSpanHandler func(*model.SpanModel) bool
}

// NewTracer returns a new Zipkin Tracer.
Expand Down Expand Up @@ -93,8 +94,9 @@ func (t *Tracer) StartSpan(name string, options ...SpanOption) Span {
Annotations: make([]model.Annotation, 0),
Tags: make(map[string]string),
},
flushOnFinish: true,
tracer: t,
flushOnFinish: true,
tracer: t,
finishedSpanHandler: t.finishedSpanHandler,
}

// add default tracer tags to span
Expand Down
10 changes: 10 additions & 0 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,13 @@ func WithNoopTracer(tracerNoop bool) TracerOption {
return nil
}
}

// WithFinishedSpanHandler if set, can mutate all span data and decide if a span
// should be recorded or not. E.g. user could decide to sample all requests having
// an error tag set or duration over certain value.
func WithFinishedSpanHandler(handler func(*model.SpanModel) bool) TracerOption {
return func(o *Tracer) error {
o.finishedSpanHandler = handler
return nil
}
}
48 changes: 48 additions & 0 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/openzipkin/zipkin-go/reporter/recorder"

"github.com/openzipkin/zipkin-go/idgenerator"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
Expand Down Expand Up @@ -808,3 +810,49 @@ func TestLocalEndpoint(t *testing.T) {
t.Errorf("IPv6 endpoint want %+v, have %+v", want.IPv6, have.IPv6)
}
}

func TestFinishedSpanHandlerAvoidsReporting(t *testing.T) {
rep := recorder.NewReporter()
defer rep.Close()

tracer, _ := NewTracer(
rep,
WithNoopSpan(false),
WithSampler(AlwaysSample),
WithFinishedSpanHandler(func(s *model.SpanModel) bool {
return false
}),
)
sp := tracer.StartSpan("test")
sp.Finish()

if want, have := 0, len(rep.Flush()); want != have {
t.Errorf("unexpected number of spans, want: %d, have: %d", want, have)
}
}

func TestFinishedSpanAddsTagsToSpan(t *testing.T) {
rep := recorder.NewReporter()
defer rep.Close()

tracer, _ := NewTracer(
rep,
WithNoopSpan(false),
WithSampler(AlwaysSample),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a test for never sample, that if the finished span handler needs all data it can still see it.

note in brave we have a flag on finished span handler which is always record or not (sampledLocal) you might need this to properly implement finished span handler.

WithFinishedSpanHandler(func(s *model.SpanModel) bool {
s.Tags["my_key"] = "my_value"
return true
}),
)
sp := tracer.StartSpan("test")
sp.Finish()

repSans := rep.Flush()
if want, have := 1, len(repSans); want != have {
t.Errorf("unexpected number of spans, want: %d, have: %d", want, have)
}

if want, have := "my_value", repSans[0].Tags["my_key"]; want != have {
t.Errorf("unexpected number of spans, want: %q, have: %q", want, have)
}
}