Skip to content

Commit 79bbad0

Browse files
Add Basic Metering Report to Workflow Engine (smartcontractkit#16398)
* Add Basic Metering Report to Workflow Engine This commit adds a metering report struct to the workflow engine such that each step can be individually added to the report. Metering units and values are generally treated as string values, but post values are verified to be valid numeric. The metering report can provide a calculated median value for all posted units. * update mathematical operations on metering spend value * fix tests and greater than comparison
1 parent 5ac8fee commit 79bbad0

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed

core/services/workflows/engine.go

+3
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ type Engine struct {
143143
clock clockwork.Clock
144144
ratelimiter *ratelimiter.RateLimiter
145145
workflowLimits *syncerlimiter.Limits
146+
meterReport *MeteringReport
146147
}
147148

148149
func (e *Engine) Start(_ context.Context) error {
@@ -564,6 +565,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) {
564565

565566
// startExecution kicks off a new workflow execution when a trigger event is received.
566567
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
568+
e.meterReport = NewMeteringReport()
569+
567570
lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
568571
lggr.Debug("executing on a trigger event")
569572
ec := &store.WorkflowExecution{

core/services/workflows/metering.go

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package workflows
2+
3+
import (
4+
"sort"
5+
"sync"
6+
7+
"github.com/shopspring/decimal"
8+
)
9+
10+
type MeteringReportStepRef string
11+
12+
type MeteringSpendUnit string
13+
14+
func (s MeteringSpendUnit) String() string {
15+
return string(s)
16+
}
17+
18+
func (s MeteringSpendUnit) DecimalToSpendValue(value decimal.Decimal) MeteringSpendValue {
19+
return MeteringSpendValue{value: value, roundingPlace: 18}
20+
}
21+
22+
func (s MeteringSpendUnit) IntToSpendValue(value int64) MeteringSpendValue {
23+
return MeteringSpendValue{value: decimal.NewFromInt(value), roundingPlace: 18}
24+
}
25+
26+
type MeteringSpendValue struct {
27+
value decimal.Decimal
28+
roundingPlace uint8
29+
}
30+
31+
func (v MeteringSpendValue) Add(value MeteringSpendValue) MeteringSpendValue {
32+
return MeteringSpendValue{
33+
value: v.value.Add(value.value),
34+
roundingPlace: v.roundingPlace,
35+
}
36+
}
37+
38+
func (v MeteringSpendValue) Div(value MeteringSpendValue) MeteringSpendValue {
39+
return MeteringSpendValue{
40+
value: v.value.Div(value.value),
41+
roundingPlace: v.roundingPlace,
42+
}
43+
}
44+
45+
func (v MeteringSpendValue) GreaterThan(value MeteringSpendValue) bool {
46+
return v.value.GreaterThan(value.value)
47+
}
48+
49+
func (v MeteringSpendValue) String() string {
50+
return v.value.StringFixedBank(int32(v.roundingPlace))
51+
}
52+
53+
type MeteringReportStep struct {
54+
Peer2PeerID string
55+
SpendUnit MeteringSpendUnit
56+
SpendValue MeteringSpendValue
57+
}
58+
59+
type MeteringReport struct {
60+
mu sync.RWMutex
61+
steps map[MeteringReportStepRef]MeteringReportStep
62+
}
63+
64+
func NewMeteringReport() *MeteringReport {
65+
return &MeteringReport{
66+
steps: make(map[MeteringReportStepRef]MeteringReportStep),
67+
}
68+
}
69+
70+
func (r *MeteringReport) MedianSpend() map[MeteringSpendUnit]MeteringSpendValue {
71+
r.mu.RLock()
72+
defer r.mu.RUnlock()
73+
74+
values := map[MeteringSpendUnit][]MeteringSpendValue{}
75+
medians := map[MeteringSpendUnit]MeteringSpendValue{}
76+
77+
for _, step := range r.steps {
78+
vals, ok := values[step.SpendUnit]
79+
if !ok {
80+
vals = []MeteringSpendValue{}
81+
}
82+
83+
values[step.SpendUnit] = append(vals, step.SpendValue)
84+
}
85+
86+
for unit, set := range values {
87+
sort.Slice(set, func(i, j int) bool {
88+
return set[j].GreaterThan(set[i])
89+
})
90+
91+
if len(set)%2 > 0 {
92+
medians[unit] = set[len(set)/2]
93+
94+
continue
95+
}
96+
97+
medians[unit] = set[len(set)/2-1].Add(set[len(set)/2]).Div(unit.IntToSpendValue(2))
98+
}
99+
100+
return medians
101+
}
102+
103+
func (r *MeteringReport) AddStep(ref MeteringReportStepRef, step MeteringReportStep) error {
104+
r.mu.Lock()
105+
defer r.mu.Unlock()
106+
107+
r.steps[ref] = step
108+
109+
return nil
110+
}
+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package workflows_test
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
7+
"github.com/shopspring/decimal"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"golang.org/x/exp/maps"
11+
12+
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
13+
)
14+
15+
func TestMeteringReport(t *testing.T) {
16+
t.Parallel()
17+
18+
testUnitA := workflows.MeteringSpendUnit("a")
19+
testUnitB := workflows.MeteringSpendUnit("b")
20+
21+
t.Run("MedianSpend returns median for multiple spend units", func(t *testing.T) {
22+
t.Parallel()
23+
24+
report := workflows.NewMeteringReport()
25+
steps := []workflows.MeteringReportStep{
26+
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
27+
{"xyz", testUnitA, testUnitA.IntToSpendValue(2)},
28+
{"abc", testUnitA, testUnitA.IntToSpendValue(3)},
29+
{"abc", testUnitB, testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.1))},
30+
{"xyz", testUnitB, testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.2))},
31+
{"abc", testUnitB, testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.3))},
32+
}
33+
34+
for idx, step := range steps {
35+
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
36+
}
37+
38+
expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
39+
testUnitA: testUnitB.IntToSpendValue(2),
40+
testUnitB: testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.2)),
41+
}
42+
43+
median := report.MedianSpend()
44+
45+
require.Len(t, median, 2)
46+
require.Contains(t, maps.Keys(median), testUnitA)
47+
require.Contains(t, maps.Keys(median), testUnitB)
48+
49+
assert.Equal(t, expected[testUnitA].String(), median[testUnitA].String())
50+
assert.Equal(t, expected[testUnitB].String(), median[testUnitB].String())
51+
})
52+
53+
t.Run("MedianSpend returns median single spend value", func(t *testing.T) {
54+
t.Parallel()
55+
56+
report := workflows.NewMeteringReport()
57+
steps := []workflows.MeteringReportStep{
58+
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
59+
}
60+
61+
for idx, step := range steps {
62+
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
63+
}
64+
65+
expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
66+
testUnitA: testUnitA.IntToSpendValue(1),
67+
}
68+
69+
median := report.MedianSpend()
70+
71+
require.Len(t, median, 1)
72+
require.Contains(t, maps.Keys(median), testUnitA)
73+
74+
assert.Equal(t, expected[testUnitA].String(), median[testUnitA].String())
75+
})
76+
77+
t.Run("MedianSpend returns median odd number of spend values", func(t *testing.T) {
78+
t.Parallel()
79+
80+
report := workflows.NewMeteringReport()
81+
steps := []workflows.MeteringReportStep{
82+
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
83+
{"abc", testUnitA, testUnitA.IntToSpendValue(3)},
84+
{"xyz", testUnitA, testUnitA.IntToSpendValue(2)},
85+
}
86+
87+
for idx, step := range steps {
88+
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
89+
}
90+
91+
expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
92+
testUnitA: testUnitA.IntToSpendValue(2),
93+
}
94+
95+
median := report.MedianSpend()
96+
97+
require.Len(t, median, 1)
98+
require.Contains(t, maps.Keys(median), testUnitA)
99+
100+
assert.Equal(t, expected[testUnitA].String(), median[testUnitA].String())
101+
})
102+
103+
t.Run("MedianSpend returns median as average for even number of spend values", func(t *testing.T) {
104+
t.Parallel()
105+
106+
report := workflows.NewMeteringReport()
107+
steps := []workflows.MeteringReportStep{
108+
{"xyz", testUnitA, testUnitA.IntToSpendValue(42)},
109+
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
110+
{"abc", testUnitA, testUnitA.IntToSpendValue(3)},
111+
{"xyz", testUnitA, testUnitA.IntToSpendValue(2)},
112+
}
113+
114+
for idx, step := range steps {
115+
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
116+
}
117+
118+
expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
119+
testUnitA: testUnitA.DecimalToSpendValue(decimal.NewFromFloat(2.5)),
120+
}
121+
122+
median := report.MedianSpend()
123+
124+
require.Len(t, median, 1)
125+
require.Contains(t, maps.Keys(median), testUnitA)
126+
127+
assert.Equal(t, expected[testUnitA].String(), median[testUnitA].String())
128+
})
129+
}

0 commit comments

Comments
 (0)