Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve tracing data and include lotus tracing #794

Merged
merged 10 commits into from
Jan 17, 2022
58 changes: 50 additions & 8 deletions chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -158,9 +159,15 @@ func NewTipSetIndexer(node lens.API, d model.Storage, window time.Duration, name

// TipSet is called when a new tipset has been discovered
func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
ctx, span := otel.Tracer("").Start(ctx, "Indexer.TipSet")
ctx, span := otel.Tracer("").Start(ctx, "TipSetIndexer.TipSet")
if span.IsRecording() {
span.SetAttributes(attribute.String("tipset", ts.String()), attribute.Int64("height", int64(ts.Height())))
span.SetAttributes(
attribute.String("tipset", ts.String()),
attribute.Int64("height", int64(ts.Height())),
attribute.String("name", t.name),
attribute.String("window", t.window.String()),
attribute.StringSlice("tasks", t.tasks),
)
}
defer span.End()

Expand Down Expand Up @@ -211,6 +218,15 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
return nil
}

if span.IsRecording() {
span.SetAttributes(
attribute.String("next_tipset", next.String()),
attribute.Int64("next_height", int64(next.Height())),
attribute.String("current_tipset", current.String()),
attribute.Int64("current_height", int64(current.Height())),
)
}

ll := log.With("current", int64(current.Height()), "next", int64(next.Height()))
ll.Debugw("indexing tipset")

Expand Down Expand Up @@ -386,12 +402,14 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
if _, complete := completed[name]; !complete {
taskOutputs[name] = model.PersistableList{t.buildSkippedTipsetReport(ts, name, start, "indexer not ready")}
ll.Infow("task skipped", "task", name, "reason", "indexer not ready")
span.AddEvent(fmt.Sprintf("skipped task: %s", res.Task))
}
}
stats.Record(ctx, metrics.TipSetSkip.M(1))
goto persist
case res = <-results:
}
span.AddEvent(fmt.Sprintf("completed task: %s", res.Task))
inFlight--

llt := ll.With("task", res.Task)
Expand Down Expand Up @@ -450,8 +468,16 @@ persist:

// Persist all results
go func() {
ctx, persistSpan := otel.Tracer("").Start(ctx, "TipSetIndexer.Persist")
if persistSpan.IsRecording() {
persistSpan.SetAttributes(
attribute.String("tipset", ts.String()),
attribute.Int64("height", int64(ts.Height())),
)
}
// free up the slot when done
defer func() {
persistSpan.End()
<-t.persistSlot
}()

Expand All @@ -477,11 +503,13 @@ persist:
wg.Wait()
ll.Infow("tasks complete", "total_time", time.Since(start))
}()

return nil
}

func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, name string, ts *types.TipSet, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.Processor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -540,9 +568,12 @@ func (t *TipSetIndexer) getGenesisActors(ctx context.Context) (map[string]lens.A
// and applies it to versions of state tress supporting it. These include Version 2 and 3 of the lotus state tree implementation.
// stateChangedActors will fall back to the lotus API method when the optimized diffing cannot be applied.
func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid) (map[string]lens.ActorStateChange, error) {
ctx, span := otel.Tracer("").Start(ctx, "StateChangedActors")
ctx, span := otel.Tracer("").Start(ctx, "TipSetIndexer.StateChangedActors")
if span.IsRecording() {
span.SetAttributes(attribute.String("old", old.String()), attribute.String("new", new.String()))
span.SetAttributes(
attribute.String("old", old.String()),
attribute.String("new", new.String()),
)
}
defer span.End()

Expand All @@ -561,9 +592,7 @@ func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid
}

if newVersion == oldVersion && (newVersion != types.StateTreeVersion0 && newVersion != types.StateTreeVersion1) {
if span.IsRecording() {
span.SetAttributes(attribute.String("diff", "fast"))
}
span.SetAttributes(attribute.String("diff", "fast"))
// TODO: replace hamt.UseTreeBitWidth and hamt.UseHashFunction with values based on network version
changes, err := hamt.Diff(ctx, t.node.Store(), t.node.Store(), oldRoot, newRoot, hamt.UseTreeBitWidth(5), hamt.UseHashFunction(func(input []byte) []byte {
res := sha256.Sum256(input)
Expand Down Expand Up @@ -609,6 +638,7 @@ func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid
return out, nil
}
}
span.SetAttributes(attribute.String("diff", "slow"))
log.Debug("using slow state diff")
actors, err := t.node.StateChangedActors(ctx, old, new)
if err != nil {
Expand All @@ -626,6 +656,9 @@ func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid
}

func (t *TipSetIndexer) runMessageProcessor(ctx context.Context, p MessageProcessor, name string, ts, pts *types.TipSet, emsgs []*lens.ExecutedMessage, blkMsgs []*lens.BlockMessages, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.MessageProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -653,6 +686,9 @@ func (t *TipSetIndexer) runMessageProcessor(ctx context.Context, p MessageProces
}

func (t *TipSetIndexer) runConsensusProcessor(ctx context.Context, p TipSetsProcessor, name string, ts, pts *types.TipSet, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.ConsensusProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -680,6 +716,9 @@ func (t *TipSetIndexer) runConsensusProcessor(ctx context.Context, p TipSetsProc
}

func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor, name string, ts, pts *types.TipSet, actors map[string]lens.ActorStateChange, emsgs []*lens.ExecutedMessage, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.ActorProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -707,6 +746,9 @@ func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor,
}

func (t *TipSetIndexer) runMessageExecutionProcessor(ctx context.Context, p MessageExecutionProcessor, name string, ts, pts *types.TipSet, imsgs []*lens.MessageExecution, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.MessageExecutionProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down
15 changes: 12 additions & 3 deletions chain/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package chain

import (
"context"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/lens"
Expand Down Expand Up @@ -72,11 +70,20 @@ func (c *Walker) Done() <-chan struct{} {
}

func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) error {
ctx, span := otel.Tracer("").Start(ctx, "Walker.WalkChain", trace.WithAttributes(attribute.Int64("height", c.maxHeight)))
ctx, span := otel.Tracer("").Start(ctx, "Walker.WalkChain")
if span.IsRecording() {
span.SetAttributes(
attribute.Int64("height", int64(ts.Height())),
attribute.String("tipset", ts.String()),
attribute.Int64("min_height", c.minHeight),
attribute.Int64("max_height", c.maxHeight),
)
}
defer span.End()

log.Debugw("found tipset", "height", ts.Height())
if err := c.obs.TipSet(ctx, ts); err != nil {
span.RecordError(err)
return xerrors.Errorf("notify tipset: %w", err)
}

Expand All @@ -90,6 +97,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)

ts, err = node.ChainGetTipSet(ctx, ts.Parents())
if err != nil {
span.RecordError(err)
return xerrors.Errorf("get tipset: %w", err)
}

Expand All @@ -99,6 +107,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)

log.Debugw("found tipset", "height", ts.Height())
if err := c.obs.TipSet(ctx, ts); err != nil {
span.RecordError(err)
return xerrors.Errorf("notify tipset: %w", err)
}

Expand Down
11 changes: 10 additions & 1 deletion chain/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package chain
import (
"context"
"errors"

"github.com/filecoin-project/lotus/chain/types"
"github.com/gammazero/workerpool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/metrics"
Expand Down Expand Up @@ -115,6 +116,14 @@ func (c *Watcher) index(ctx context.Context, he *HeadEvent) error {

// maybeIndexTipSet is called when a new tipset has been discovered
func (c *Watcher) maybeIndexTipSet(ctx context.Context, ts *types.TipSet) error {
ctx, span := otel.Tracer("").Start(ctx, "Watcher.maybeIndexTipSet")
if span.IsRecording() {
span.SetAttributes(
attribute.String("tipset", ts.String()),
attribute.Int64("height", int64(ts.Height())),
)
}
defer span.End()
// Process the tipset if we can, otherwise skip it so we don't block if indexing is too slow
select {
case <-ctx.Done():
Expand Down
12 changes: 2 additions & 10 deletions commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/commands/util"
Expand Down Expand Up @@ -178,14 +176,8 @@ Note that jobs are not persisted between restarts of the daemon. See
return xerrors.Errorf("setup metrics: %w", err)
}

if VisorTracingFlags.Tracing {
tp, err := NewJaegerTraceProvider(VisorTracingFlags)
if err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}
otel.SetTracerProvider(tp)
} else {
otel.SetTracerProvider(trace.NewNoopTracerProvider())
if err := setupTracing(VisorTracingFlags); err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}

ctx := context.Background()
Expand Down
63 changes: 23 additions & 40 deletions commands/setup.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package commands

import (
"fmt"
octrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/bridge/opencensus"
"net/http"
"net/http/pprof"
"strings"
Expand All @@ -16,10 +18,6 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"go.opencensus.io/stats/view"
"go.opencensus.io/zpages"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/metrics"
Expand All @@ -36,11 +34,9 @@ type VisorLogOpts struct {
var VisorLogFlags VisorLogOpts

type VisorTracingOpts struct {
Tracing bool
JaegerHost string
JaegerPort int
JaegerName string
JaegerSampleType string
Enabled bool
ServiceName string
ProviderURL string
JaegerSamplerParam float64
}

Expand All @@ -52,36 +48,6 @@ type VisorMetricOpts struct {

var VisorMetricFlags VisorMetricOpts

func NewJaegerTraceProvider(flags VisorTracingOpts) (*sdktrace.TracerProvider, error) {
serviceName := flags.JaegerName
sampleRatio := flags.JaegerSamplerParam
agentEndpoint := fmt.Sprintf("http://%s:%d/api/traces", flags.JaegerHost, flags.JaegerPort)

log.Infow("creating jaeger trace provider", "name", serviceName, "ratio", sampleRatio, "endpoint", agentEndpoint)
var sampler sdktrace.Sampler
if sampleRatio < 1 && sampleRatio > 0 {
sampler = sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRatio))
} else if sampleRatio == 1 {
sampler = sdktrace.AlwaysSample()
} else {
sampler = sdktrace.NeverSample()
}

exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(agentEndpoint)))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
// Always be sure to batch in production.
sdktrace.WithBatcher(exp),
sdktrace.WithSampler(sampler),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String(serviceName),
)),
)
return tp, nil
}

func setupLogging(flags VisorLogOpts) error {
ll := flags.LogLevel
if err := logging.SetLogLevel("*", ll); err != nil {
Expand Down Expand Up @@ -161,3 +127,20 @@ func setupMetrics(flags VisorMetricOpts) error {
}()
return nil
}

func setupTracing(flags VisorTracingOpts) error {
if !flags.Enabled {
return nil
}

tp, err := metrics.NewJaegerTraceProvider(VisorTracingFlags.ServiceName, VisorTracingFlags.ProviderURL, VisorTracingFlags.JaegerSamplerParam)
if err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}
otel.SetTracerProvider(tp)
// upgrades libraries (lotus) that use OpenCensus to OpenTelemetry to facilitate a migration.
tracer := tp.Tracer(VisorTracingFlags.ServiceName)
octrace.DefaultTracer = opencensus.NewTracer(tracer)

return nil
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
- "6831:6831/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"

prometheus:
image: prom/prometheus:v2.1.0
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ require (
github.com/urfave/cli/v2 v2.3.0
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.opentelemetry.io/otel v1.2.0
go.opentelemetry.io/otel/bridge/opencensus v0.25.0
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0
go.uber.org/fx v1.15.0
go.uber.org/zap v1.19.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
Expand Down
Loading