diff --git a/internal/graph/generated.go b/internal/graph/generated.go index e12a2a0..32883b9 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -1194,8 +1194,7 @@ type Query { """ SignalsLatest returns the latest signals for a given token. """ - signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection - @requiresToken + signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection @requiresToken } type SignalAggregations { """ @@ -1217,6 +1216,8 @@ enum FloatAggregation { MAX MIN RAND + FIRST + LAST } enum StringAggregation { @@ -1232,6 +1233,14 @@ enum StringAggregation { Return a list of unique values in the group. """ UNIQUE + """ + Return value in group associated with the minimum time value. + """ + FIRST + """ + Return value in group associated with the maximum time value. + """ + LAST } type SignalFloat { """ diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 15b4522..d04756c 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -163,11 +163,13 @@ type Vinvc struct { type FloatAggregation string const ( - FloatAggregationAvg FloatAggregation = "AVG" - FloatAggregationMed FloatAggregation = "MED" - FloatAggregationMax FloatAggregation = "MAX" - FloatAggregationMin FloatAggregation = "MIN" - FloatAggregationRand FloatAggregation = "RAND" + FloatAggregationAvg FloatAggregation = "AVG" + FloatAggregationMed FloatAggregation = "MED" + FloatAggregationMax FloatAggregation = "MAX" + FloatAggregationMin FloatAggregation = "MIN" + FloatAggregationRand FloatAggregation = "RAND" + FloatAggregationFirst FloatAggregation = "FIRST" + FloatAggregationLast FloatAggregation = "LAST" ) var AllFloatAggregation = []FloatAggregation{ @@ -176,11 +178,13 @@ var AllFloatAggregation = []FloatAggregation{ FloatAggregationMax, FloatAggregationMin, FloatAggregationRand, + FloatAggregationFirst, + FloatAggregationLast, } func (e FloatAggregation) IsValid() bool { switch e { - case FloatAggregationAvg, FloatAggregationMed, FloatAggregationMax, FloatAggregationMin, FloatAggregationRand: + case FloatAggregationAvg, FloatAggregationMed, FloatAggregationMax, FloatAggregationMin, FloatAggregationRand, FloatAggregationFirst, FloatAggregationLast: return true } return false @@ -263,17 +267,23 @@ const ( StringAggregationTop StringAggregation = "TOP" // Return a list of unique values in the group. StringAggregationUnique StringAggregation = "UNIQUE" + // Return value in group associated with the minimum time value. + StringAggregationFirst StringAggregation = "FIRST" + // Return value in group associated with the maximum time value. + StringAggregationLast StringAggregation = "LAST" ) var AllStringAggregation = []StringAggregation{ StringAggregationRand, StringAggregationTop, StringAggregationUnique, + StringAggregationFirst, + StringAggregationLast, } func (e StringAggregation) IsValid() bool { switch e { - case StringAggregationRand, StringAggregationTop, StringAggregationUnique: + case StringAggregationRand, StringAggregationTop, StringAggregationUnique, StringAggregationFirst, StringAggregationLast: return true } return false diff --git a/internal/service/ch/ch_test.go b/internal/service/ch/ch_test.go index 74f8399..6ed002f 100644 --- a/internal/service/ch/ch_test.go +++ b/internal/service/ch/ch_test.go @@ -191,7 +191,7 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { { Name: vss.FieldPowertrainType, Timestamp: c.dataStartTime, - ValueString: "value2,value1,value3", + ValueString: "value10,value3,value2,value9,value7,value5,value4,value8,value1,value6", Agg: model.StringAggregationUnique.String(), }, }, @@ -224,6 +224,106 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { }, }, }, + { + name: "first float", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + FloatArgs: []model.FloatSignalArgs{ + { + Name: vss.FieldSpeed, + Agg: model.FloatAggregationFirst, + }, + }, + }, + expected: []model.AggSignal{ + { + Name: vss.FieldSpeed, + Timestamp: c.dataStartTime, + ValueNumber: 0, + Agg: model.FloatAggregationFirst.String(), + }, + }, + }, + { + name: "last float", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + FloatArgs: []model.FloatSignalArgs{ + { + Name: vss.FieldSpeed, + Agg: model.FloatAggregationLast, + }, + }, + }, + expected: []model.AggSignal{ + { + Name: vss.FieldSpeed, + Timestamp: c.dataStartTime, + ValueNumber: dataPoints - 1, + Agg: model.FloatAggregationLast.String(), + }, + }, + }, + { + name: "first string", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + StringArgs: []model.StringSignalArgs{ + { + Name: vss.FieldPowertrainType, + Agg: model.StringAggregationFirst, + }, + }, + }, + expected: []model.AggSignal{ + { + Name: vss.FieldPowertrainType, + Timestamp: c.dataStartTime, + ValueString: "value1", + Agg: model.StringAggregationFirst.String(), + }, + }, + }, + { + name: "last string", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + StringArgs: []model.StringSignalArgs{ + { + Name: vss.FieldPowertrainType, + Agg: model.StringAggregationLast, + }, + }, + }, + expected: []model.AggSignal{ + { + Name: vss.FieldPowertrainType, + Timestamp: c.dataStartTime, + ValueString: fmt.Sprintf("value%d", dataPoints), + Agg: model.StringAggregationLast.String(), + }, + }, + }, } for _, tc := range testCases { c.Run(tc.name, func() { @@ -373,9 +473,8 @@ func (c *CHServiceTestSuite) insertTestData() { Timestamp: c.dataStartTime.Add(time.Second * time.Duration(30*i)), Source: sources[i%3], TokenID: 1, - ValueString: fmt.Sprintf("value%d", i%3+1), + ValueString: fmt.Sprintf("value%d", i+1), } - testSignal = append(testSignal, numSig, strSig) } diff --git a/internal/service/ch/queries.go b/internal/service/ch/queries.go index 4c86bfd..33d0bbc 100644 --- a/internal/service/ch/queries.go +++ b/internal/service/ch/queries.go @@ -44,18 +44,22 @@ const ( // Aggregation functions for float signals. const ( - avgGroup = "avg(" + vss.ValueNumberCol + ")" - randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]" - minGroup = "min(" + vss.ValueNumberCol + ")" - maxGroup = "max(" + vss.ValueNumberCol + ")" - medGroup = "median(" + vss.ValueNumberCol + ")" + avgGroup = "avg(" + vss.ValueNumberCol + ")" + randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]" + minGroup = "min(" + vss.ValueNumberCol + ")" + maxGroup = "max(" + vss.ValueNumberCol + ")" + medGroup = "median(" + vss.ValueNumberCol + ")" + firstFloatGroup = "argMin(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" + lastFloatGroup = "argMax(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" ) // Aggregation functions for string signals. const ( - randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]" - uniqueGroup = "arrayStringConcat(groupUniqArray(" + vss.ValueStringCol + "),',')" - topGroup = "arrayStringConcat(topK(1, 10)(" + vss.ValueStringCol + "))" + randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]" + uniqueGroup = "arrayStringConcat(groupUniqArray(" + vss.ValueStringCol + "),',')" + topGroup = "arrayStringConcat(topK(1, 10)(" + vss.ValueStringCol + "))" + firstStringGroup = "argMin(" + vss.ValueStringCol + ", " + vss.TimestampCol + ")" + lastStringGroup = "argMax(" + vss.ValueStringCol + ", " + vss.TimestampCol + ")" ) // TODO: remove this map when we move to storing the device address @@ -145,6 +149,10 @@ func getFloatAggFunc(aggType model.FloatAggregation) string { aggStr = maxGroup case model.FloatAggregationMed: aggStr = medGroup + case model.FloatAggregationFirst: + aggStr = firstFloatGroup + case model.FloatAggregationLast: + aggStr = lastFloatGroup } return aggStr } @@ -160,6 +168,10 @@ func getStringAgg(aggType model.StringAggregation) string { aggStr = uniqueGroup case model.StringAggregationTop: aggStr = topGroup + case model.StringAggregationFirst: + aggStr = firstStringGroup + case model.StringAggregationLast: + aggStr = lastStringGroup } return aggStr } diff --git a/schema/base.graphqls b/schema/base.graphqls index 09fb3a4..2be1084 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -33,8 +33,7 @@ type Query { """ SignalsLatest returns the latest signals for a given token. """ - signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection - @requiresToken + signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection @requiresToken } type SignalAggregations { """ @@ -56,6 +55,8 @@ enum FloatAggregation { MAX MIN RAND + FIRST + LAST } enum StringAggregation { @@ -71,6 +72,14 @@ enum StringAggregation { Return a list of unique values in the group. """ UNIQUE + """ + Return value in group associated with the minimum time value. + """ + FIRST + """ + Return value in group associated with the maximum time value. + """ + LAST } type SignalFloat { """