Skip to content

Commit

Permalink
Merge pull request #47 from DIMO-Network/SI-2749-first-and-last-agg
Browse files Browse the repository at this point in the history
SI-2749-first-and-last-agg
  • Loading branch information
Allyson-English authored Jul 10, 2024
2 parents 30320f1 + dd3ba72 commit 5967cda
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 22 deletions.
13 changes: 11 additions & 2 deletions internal/graph/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions internal/graph/model/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 102 additions & 3 deletions internal/service/ch/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *CHServiceTestSuite) TestGetAggSignal() {
{
Name: vss.FieldPowertrainType,
Timestamp: c.dataStartTime,
ValueString: "value2,value1,value3",
ValueString: "value10,value3,value2,value9,value7,value5,value4,value8,value1,value6",
Agg: model.StringAggregationUnique.String(),
},
},
Expand Down Expand Up @@ -224,6 +224,106 @@ func (c *CHServiceTestSuite) TestGetAggSignal() {
},
},
},
{
name: "first float",
aggArgs: model.AggregatedSignalArgs{
SignalArgs: model.SignalArgs{
TokenID: 1,
},
FromTS: c.dataStartTime,
ToTS: endTs,
Interval: day.Milliseconds(),
FloatArgs: []model.FloatSignalArgs{
{
Name: vss.FieldSpeed,
Agg: model.FloatAggregationFirst,
},
},
},
expected: []model.AggSignal{
{
Name: vss.FieldSpeed,
Timestamp: c.dataStartTime,
ValueNumber: 0,
Agg: model.FloatAggregationFirst.String(),
},
},
},
{
name: "last float",
aggArgs: model.AggregatedSignalArgs{
SignalArgs: model.SignalArgs{
TokenID: 1,
},
FromTS: c.dataStartTime,
ToTS: endTs,
Interval: day.Milliseconds(),
FloatArgs: []model.FloatSignalArgs{
{
Name: vss.FieldSpeed,
Agg: model.FloatAggregationLast,
},
},
},
expected: []model.AggSignal{
{
Name: vss.FieldSpeed,
Timestamp: c.dataStartTime,
ValueNumber: dataPoints - 1,
Agg: model.FloatAggregationLast.String(),
},
},
},
{
name: "first string",
aggArgs: model.AggregatedSignalArgs{
SignalArgs: model.SignalArgs{
TokenID: 1,
},
FromTS: c.dataStartTime,
ToTS: endTs,
Interval: day.Milliseconds(),
StringArgs: []model.StringSignalArgs{
{
Name: vss.FieldPowertrainType,
Agg: model.StringAggregationFirst,
},
},
},
expected: []model.AggSignal{
{
Name: vss.FieldPowertrainType,
Timestamp: c.dataStartTime,
ValueString: "value1",
Agg: model.StringAggregationFirst.String(),
},
},
},
{
name: "last string",
aggArgs: model.AggregatedSignalArgs{
SignalArgs: model.SignalArgs{
TokenID: 1,
},
FromTS: c.dataStartTime,
ToTS: endTs,
Interval: day.Milliseconds(),
StringArgs: []model.StringSignalArgs{
{
Name: vss.FieldPowertrainType,
Agg: model.StringAggregationLast,
},
},
},
expected: []model.AggSignal{
{
Name: vss.FieldPowertrainType,
Timestamp: c.dataStartTime,
ValueString: fmt.Sprintf("value%d", dataPoints),
Agg: model.StringAggregationLast.String(),
},
},
},
}
for _, tc := range testCases {
c.Run(tc.name, func() {
Expand Down Expand Up @@ -373,9 +473,8 @@ func (c *CHServiceTestSuite) insertTestData() {
Timestamp: c.dataStartTime.Add(time.Second * time.Duration(30*i)),
Source: sources[i%3],
TokenID: 1,
ValueString: fmt.Sprintf("value%d", i%3+1),
ValueString: fmt.Sprintf("value%d", i+1),
}

testSignal = append(testSignal, numSig, strSig)
}

Expand Down
28 changes: 20 additions & 8 deletions internal/service/ch/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ const (

// Aggregation functions for float signals.
const (
avgGroup = "avg(" + vss.ValueNumberCol + ")"
randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]"
minGroup = "min(" + vss.ValueNumberCol + ")"
maxGroup = "max(" + vss.ValueNumberCol + ")"
medGroup = "median(" + vss.ValueNumberCol + ")"
avgGroup = "avg(" + vss.ValueNumberCol + ")"
randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]"
minGroup = "min(" + vss.ValueNumberCol + ")"
maxGroup = "max(" + vss.ValueNumberCol + ")"
medGroup = "median(" + vss.ValueNumberCol + ")"
firstFloatGroup = "argMin(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")"
lastFloatGroup = "argMax(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")"
)

// Aggregation functions for string signals.
const (
randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]"
uniqueGroup = "arrayStringConcat(groupUniqArray(" + vss.ValueStringCol + "),',')"
topGroup = "arrayStringConcat(topK(1, 10)(" + vss.ValueStringCol + "))"
randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]"
uniqueGroup = "arrayStringConcat(groupUniqArray(" + vss.ValueStringCol + "),',')"
topGroup = "arrayStringConcat(topK(1, 10)(" + vss.ValueStringCol + "))"
firstStringGroup = "argMin(" + vss.ValueStringCol + ", " + vss.TimestampCol + ")"
lastStringGroup = "argMax(" + vss.ValueStringCol + ", " + vss.TimestampCol + ")"
)

// TODO: remove this map when we move to storing the device address
Expand Down Expand Up @@ -145,6 +149,10 @@ func getFloatAggFunc(aggType model.FloatAggregation) string {
aggStr = maxGroup
case model.FloatAggregationMed:
aggStr = medGroup
case model.FloatAggregationFirst:
aggStr = firstFloatGroup
case model.FloatAggregationLast:
aggStr = lastFloatGroup
}
return aggStr
}
Expand All @@ -160,6 +168,10 @@ func getStringAgg(aggType model.StringAggregation) string {
aggStr = uniqueGroup
case model.StringAggregationTop:
aggStr = topGroup
case model.StringAggregationFirst:
aggStr = firstStringGroup
case model.StringAggregationLast:
aggStr = lastStringGroup
}
return aggStr
}
Expand Down
13 changes: 11 additions & 2 deletions schema/base.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ type Query {
"""
SignalsLatest returns the latest signals for a given token.
"""
signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection
@requiresToken
signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection @requiresToken
}
type SignalAggregations {
"""
Expand All @@ -56,6 +55,8 @@ enum FloatAggregation {
MAX
MIN
RAND
FIRST
LAST
}

enum StringAggregation {
Expand All @@ -71,6 +72,14 @@ enum StringAggregation {
Return a list of unique values in the group.
"""
UNIQUE
"""
Return value in group associated with the minimum time value.
"""
FIRST
"""
Return value in group associated with the maximum time value.
"""
LAST
}
type SignalFloat {
"""
Expand Down

0 comments on commit 5967cda

Please sign in to comment.