Skip to content

Commit 2538e83

Browse files
authored
Implement report telemetry for LLO; fix typing for LLO telemetry packets (smartcontractkit#16712)
* Implement report telemetry for LLO * Support Outcome telemetry * Suppress errors decoding bad offchain config
1 parent 75785b6 commit 2538e83

32 files changed

+683
-149
lines changed

core/scripts/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/shopspring/decimal v1.4.0
3434
github.com/smartcontractkit/chainlink-automation v0.8.1
3535
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250310180230-58f4a9810e21
36-
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250228145850-f846693a6fd5
36+
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250312151008-ab5d35236de4
3737
github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250311180911-0754238e140b
3838
github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.22
3939
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298

core/scripts/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1124,8 +1124,8 @@ github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250307105933-7
11241124
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250307105933-7912a5e97ad0/go.mod h1:2Y6JRpvj9EkgKgymvenuqCnra07+NYVB6+0rQGETSGA=
11251125
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250310180230-58f4a9810e21 h1:Z+IZ7znVnD2idSmZAT72ilj13rKYLxVt4GVBOCNOPUc=
11261126
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250310180230-58f4a9810e21/go.mod h1:YQuXIqQpmpAqstWV0LHaDTJ5nsSWuip5ivEM+Fisb+4=
1127-
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250228145850-f846693a6fd5 h1:cJpPJ5hEwc6vlMoxmATS60uWPUi62ydnTVBabu6WKEE=
1128-
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250228145850-f846693a6fd5/go.mod h1:2yUpKW1/jFxpozO/Zkh3fKDzI0jthXoEcU2xuDq+vlo=
1127+
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250312151008-ab5d35236de4 h1:fNKO/YvUHiNASP7yrboYHQHqZPe+gPLxlv9UhBHdOxc=
1128+
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250312151008-ab5d35236de4/go.mod h1:2yUpKW1/jFxpozO/Zkh3fKDzI0jthXoEcU2xuDq+vlo=
11291129
github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 h1:8u9xUrC+yHrTDexOKDd+jrA6LCzFFHeX1G82oj2fsSI=
11301130
github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135/go.mod h1:NkvE4iQgiT7dMCP6U3xPELHhWhN5Xr6rHC0axRebyMU=
11311131
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250227163723-3c71fefea680 h1:Es/V5imh4at3NdHDWlbDMjljd24TrJQCy/+8xAVajl0=

core/services/llo/data_source.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,14 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
119119
// Size needs to accommodate the max number of telemetry events that could be generated
120120
// Standard case might be about 3 bridge requests per spec and one stream<=>spec
121121
// Overallocate for safety (to avoid dropping packets)
122-
telemCh := d.t.MakeTelemChannel(opts, 10*len(streamValues))
122+
telemCh := d.t.MakeObservationScopedTelemetryCh(opts, 10*len(streamValues))
123123
if telemCh != nil {
124-
ctx = pipeline.WithTelemetryCh(ctx, telemCh)
124+
if d.t.CaptureEATelemetry() {
125+
ctx = pipeline.WithTelemetryCh(ctx, telemCh)
126+
}
127+
if d.t.CaptureObservationTelemetry() {
128+
ctx = WithObservationTelemetryCh(ctx, telemCh)
129+
}
125130
// After all Observations have returned, nothing else will be sent to the
126131
// telemetry channel, so it can safely be closed
127132
defer close(telemCh)

core/services/llo/data_source_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
2424

2525
"github.com/smartcontractkit/chainlink-data-streams/llo"
26+
datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo"
2627
"github.com/smartcontractkit/chainlink/v2/core/bridges"
2728
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
2829
clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest"
@@ -116,11 +117,16 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.
116117
defer m.mu.Unlock()
117118
m.v3PremiumLegacyPackets = append(m.v3PremiumLegacyPackets, v3PremiumLegacyPacket{run, trrs, streamID, opts, val, err})
118119
}
119-
120-
func (m *mockTelemeter) MakeTelemChannel(opts llo.DSOpts, size int) (ch chan<- interface{}) {
120+
func (m *mockTelemeter) MakeObservationScopedTelemetryCh(opts llo.DSOpts, size int) (ch chan<- interface{}) {
121121
m.ch = make(chan interface{}, size)
122122
return m.ch
123123
}
124+
func (m *mockTelemeter) GetOutcomeTelemetryCh() chan<- *datastreamsllo.LLOOutcomeTelemetry {
125+
return nil
126+
}
127+
func (m *mockTelemeter) GetReportTelemetryCh() chan<- *datastreamsllo.LLOReportTelemetry { return nil }
128+
func (m *mockTelemeter) CaptureEATelemetry() bool { return true }
129+
func (m *mockTelemeter) CaptureObservationTelemetry() bool { return true }
124130

125131
func Test_DataSource(t *testing.T) {
126132
lggr := logger.TestLogger(t)

core/services/llo/delegate.go

+42-30
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/smartcontractkit/chainlink/v2/core/services/job"
2424
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3/promwrapper"
2525
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
26+
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
2627
)
2728

2829
var _ job.ServiceCtx = &delegate{}
@@ -39,28 +40,31 @@ type delegate struct {
3940

4041
src datastreamsllo.ShouldRetireCache
4142
ds datastreamsllo.DataSource
42-
telem services.Service
43+
telem TelemeterService
4344

4445
oracles []Closer
4546
}
4647

4748
type DelegateConfig struct {
48-
Logger logger.Logger
49-
DataSource sqlutil.DataSource
50-
Runner streams.Runner
51-
Registry Registry
52-
JobName null.String
53-
CaptureEATelemetry bool
49+
Logger logger.Logger
50+
DataSource sqlutil.DataSource
51+
Runner streams.Runner
52+
Registry Registry
53+
JobName null.String
54+
CaptureEATelemetry bool
55+
CaptureObservationTelemetry bool
56+
CaptureOutcomeTelemetry bool
57+
CaptureReportTelemetry bool
5458

5559
// LLO
56-
ChannelDefinitionCache llotypes.ChannelDefinitionCache
57-
ReportingPluginConfig datastreamsllo.Config
58-
RetirementReportCache RetirementReportCache
59-
RetirementReportCodec datastreamsllo.RetirementReportCodec
60-
ShouldRetireCache datastreamsllo.ShouldRetireCache
61-
EAMonitoringEndpoint ocrcommontypes.MonitoringEndpoint
62-
DonID uint32
63-
ChainID string
60+
ChannelDefinitionCache llotypes.ChannelDefinitionCache
61+
ReportingPluginConfig datastreamsllo.Config
62+
RetirementReportCache RetirementReportCache
63+
RetirementReportCodec datastreamsllo.RetirementReportCodec
64+
ShouldRetireCache datastreamsllo.ShouldRetireCache
65+
PluginMonitoringEndpoint telemetry.MultitypeMonitoringEndpoint
66+
DonID uint32
67+
ChainID string
6468

6569
// OCR3
6670
TraceLogging bool
@@ -102,12 +106,15 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
102106
}
103107
reportCodecs := NewReportCodecs(codecLggr, cfg.DonID)
104108

105-
var t TelemeterService
106-
if cfg.CaptureEATelemetry {
107-
t = NewTelemeterService(lggr, cfg.EAMonitoringEndpoint, cfg.DonID)
108-
} else {
109-
t = NullTelemeter
110-
}
109+
t := NewTelemeterService(TelemeterParams{
110+
Logger: lggr,
111+
MonitoringEndpoint: cfg.PluginMonitoringEndpoint,
112+
DonID: cfg.DonID,
113+
CaptureEATelemetry: cfg.CaptureEATelemetry,
114+
CaptureObservationTelemetry: cfg.CaptureObservationTelemetry,
115+
CaptureOutcomeTelemetry: cfg.CaptureOutcomeTelemetry,
116+
CaptureReportTelemetry: cfg.CaptureReportTelemetry,
117+
})
111118
ds := newDataSource(logger.Named(lggr, "DataSource"), cfg.Registry, t)
112119

113120
return &delegate{services.StateMachine{}, cfg, reportCodecs, cfg.ShouldRetireCache, ds, t, []Closer{}}, nil
@@ -155,15 +162,20 @@ func (d *delegate) Start(ctx context.Context) error {
155162
OnchainKeyring: d.cfg.OnchainKeyring,
156163
ReportingPluginFactory: promwrapper.NewReportingPluginFactory(
157164
datastreamsllo.NewPluginFactory(
158-
d.cfg.ReportingPluginConfig,
159-
psrrc,
160-
d.src,
161-
d.cfg.RetirementReportCodec,
162-
d.cfg.ChannelDefinitionCache,
163-
d.ds,
164-
logger.Named(lggr, "ReportingPlugin"),
165-
llo.EVMOnchainConfigCodec{},
166-
d.reportCodecs,
165+
datastreamsllo.PluginFactoryParams{
166+
Config: d.cfg.ReportingPluginConfig,
167+
PredecessorRetirementReportCache: psrrc,
168+
ShouldRetireCache: d.src,
169+
RetirementReportCodec: d.cfg.RetirementReportCodec,
170+
ChannelDefinitionCache: d.cfg.ChannelDefinitionCache,
171+
DataSource: d.ds,
172+
Logger: logger.Named(lggr, "ReportingPlugin"),
173+
OnchainConfigCodec: llo.EVMOnchainConfigCodec{},
174+
ReportCodecs: d.reportCodecs,
175+
OutcomeTelemetryCh: d.telem.GetOutcomeTelemetryCh(),
176+
ReportTelemetryCh: d.telem.GetReportTelemetryCh(),
177+
DonID: d.cfg.DonID,
178+
},
167179
),
168180
lggr,
169181
d.cfg.ChainID,

core/services/llo/observation_context.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,20 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre
7878
break
7979
}
8080
}
81-
// If no streamID attribute is found in the task results, then assume the
82-
// final output is the stream ID and return that. This is safe to do since
83-
// the registry will never return a spec that doesn't match either by tag
84-
// or by spec streamID.
8581
if !found {
86-
// FIXME: This is a hack specific for V3 telemetry, future schemas should
87-
// use the generic stream value telemetry instead
88-
// https://smartcontract-it.atlassian.net/browse/MERC-6290
82+
// If no streamID attribute is found in the task results, then assume the
83+
// final output is the stream ID and return that. This is safe to do since
84+
// the registry will never return a spec that doesn't match either by tag
85+
// or by spec streamID.
8986
val, err = extractFinalResultAsStreamValue(trrs)
90-
oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err)
87+
if oc.t.CaptureEATelemetry() {
88+
// FIXME: This is a hack specific for V3 telemetry, future schemas should
89+
// use the generic stream value telemetry instead
90+
// https://smartcontract-it.atlassian.net/browse/MERC-6290
91+
oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err)
92+
}
9193
}
92-
if ch := pipeline.GetTelemetryCh(ctx); ch != nil {
94+
if ch := GetObservationTelemetryCh(ctx); ch != nil {
9395
cd := opts.ConfigDigest()
9496
ot := &telem.LLOObservationTelemetry{
9597
StreamId: streamID,

core/services/llo/telem/telem_streams.pb.go

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/llo/telem/telem_streams.proto

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ message LLOBridgeTelemetry {
2424
int64 observation_timestamp = 15;
2525
}
2626

27+
// LLOObservationTelemetry packet sent for each stream on every call to
28+
// Observation (once per round)
2729
message LLOObservationTelemetry {
2830
uint32 stream_id = 1;
2931
int32 stream_value_type = 2;

0 commit comments

Comments
 (0)