From c4869951e81fe96b8297ed81d853eab0bd3fdeb2 Mon Sep 17 00:00:00 2001 From: Allyson-English Date: Wed, 3 Jul 2024 14:29:14 -0400 Subject: [PATCH 1/5] return first and last aggregation --- internal/graph/generated.go | 5 +-- internal/graph/model/models_gen.go | 16 ++++++---- internal/service/ch/ch_test.go | 49 +++++++++++++++++++++++++++++- internal/service/ch/queries.go | 6 ++++ schema/base.graphqls | 5 +-- 5 files changed, 70 insertions(+), 11 deletions(-) diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 7f1ecfa..d741ec6 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -1104,8 +1104,7 @@ type Query { """ SignalsLatest returns the latest signals for a given token. """ - signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection - @requiresToken + signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection @requiresToken } type SignalAggregations { """ @@ -1127,6 +1126,8 @@ enum FloatAggregation { MAX MIN RAND + FIRST + LAST } enum StringAggregation { diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index ed75ef4..c517907 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -275,11 +275,13 @@ type SignalString struct { type FloatAggregation string const ( - FloatAggregationAvg FloatAggregation = "AVG" - FloatAggregationMed FloatAggregation = "MED" - FloatAggregationMax FloatAggregation = "MAX" - FloatAggregationMin FloatAggregation = "MIN" - FloatAggregationRand FloatAggregation = "RAND" + FloatAggregationAvg FloatAggregation = "AVG" + FloatAggregationMed FloatAggregation = "MED" + FloatAggregationMax FloatAggregation = "MAX" + FloatAggregationMin FloatAggregation = "MIN" + FloatAggregationRand FloatAggregation = "RAND" + FloatAggregationFirst FloatAggregation = "FIRST" + FloatAggregationLast FloatAggregation = "LAST" ) var AllFloatAggregation = []FloatAggregation{ @@ -288,11 +290,13 @@ var AllFloatAggregation = []FloatAggregation{ FloatAggregationMax, FloatAggregationMin, FloatAggregationRand, + FloatAggregationFirst, + FloatAggregationLast, } func (e FloatAggregation) IsValid() bool { switch e { - case FloatAggregationAvg, FloatAggregationMed, FloatAggregationMax, FloatAggregationMin, FloatAggregationRand: + case FloatAggregationAvg, FloatAggregationMed, FloatAggregationMax, FloatAggregationMin, FloatAggregationRand, FloatAggregationFirst, FloatAggregationLast: return true } return false diff --git a/internal/service/ch/ch_test.go b/internal/service/ch/ch_test.go index d490a86..c754c96 100644 --- a/internal/service/ch/ch_test.go +++ b/internal/service/ch/ch_test.go @@ -177,6 +177,54 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { }, }, }, + { + name: "first", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + FloatArgs: []model.FloatSignalArgs{ + { + Name: vss.FieldSpeed, + Agg: model.FloatAggregationFirst, + }, + }, + }, + expected: []vss.Signal{ + { + Name: vss.FieldSpeed, + Timestamp: c.dataStartTime, + ValueNumber: 0, + }, + }, + }, + { + name: "last", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + FloatArgs: []model.FloatSignalArgs{ + { + Name: vss.FieldSpeed, + Agg: model.FloatAggregationLast, + }, + }, + }, + expected: []vss.Signal{ + { + Name: vss.FieldSpeed, + Timestamp: c.dataStartTime, + ValueNumber: 9, + }, + }, + }, } for _, tc := range testCases { c.Run(tc.name, func() { @@ -325,7 +373,6 @@ func (c *CHServiceTestSuite) insertTestData() { TokenID: 1, ValueString: fmt.Sprintf("value%d", i%3+1), } - testSignal = append(testSignal, numSig, strSig) } diff --git a/internal/service/ch/queries.go b/internal/service/ch/queries.go index a93a5f2..7e06e4d 100644 --- a/internal/service/ch/queries.go +++ b/internal/service/ch/queries.go @@ -45,6 +45,8 @@ const ( minGroup = "min(" + vss.ValueNumberCol + ")" maxGroup = "max(" + vss.ValueNumberCol + ")" medGroup = "median(" + vss.ValueNumberCol + ")" + firstGroup = "first_value(" + vss.ValueNumberCol + ")" + lastGroup = "last_value(" + vss.ValueNumberCol + ")" ) // Aggregation functions for string signals. @@ -141,6 +143,10 @@ func getFloatAggFunc(aggType model.FloatAggregation) string { aggStr = maxGroup case model.FloatAggregationMed: aggStr = medGroup + case model.FloatAggregationFirst: + aggStr = firstGroup + case model.FloatAggregationLast: + aggStr = lastGroup } return aggStr } diff --git a/schema/base.graphqls b/schema/base.graphqls index 09fb3a4..b545a71 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -33,8 +33,7 @@ type Query { """ SignalsLatest returns the latest signals for a given token. """ - signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection - @requiresToken + signalsLatest(tokenId: Int!, filter: SignalFilter): SignalCollection @requiresToken } type SignalAggregations { """ @@ -56,6 +55,8 @@ enum FloatAggregation { MAX MIN RAND + FIRST + LAST } enum StringAggregation { From e1673ef6ff2c1ff87df16163a9528d8dc8776a9e Mon Sep 17 00:00:00 2001 From: Allyson-English Date: Mon, 8 Jul 2024 10:09:25 -0400 Subject: [PATCH 2/5] argMin/ argMax --- internal/service/ch/ch_test.go | 6 ++++-- internal/service/ch/queries.go | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/service/ch/ch_test.go b/internal/service/ch/ch_test.go index 25eb093..cbd64c8 100644 --- a/internal/service/ch/ch_test.go +++ b/internal/service/ch/ch_test.go @@ -240,11 +240,12 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { }, }, }, - expected: []vss.Signal{ + expected: []model.AggSignal{ { Name: vss.FieldSpeed, Timestamp: c.dataStartTime, ValueNumber: 0, + Agg: model.FloatAggregationFirst.String(), }, }, }, @@ -264,11 +265,12 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { }, }, }, - expected: []vss.Signal{ + expected: []model.AggSignal{ { Name: vss.FieldSpeed, Timestamp: c.dataStartTime, ValueNumber: 9, + Agg: model.FloatAggregationLast.String(), }, }, }, diff --git a/internal/service/ch/queries.go b/internal/service/ch/queries.go index b233855..721da46 100644 --- a/internal/service/ch/queries.go +++ b/internal/service/ch/queries.go @@ -49,8 +49,8 @@ const ( minGroup = "min(" + vss.ValueNumberCol + ")" maxGroup = "max(" + vss.ValueNumberCol + ")" medGroup = "median(" + vss.ValueNumberCol + ")" - firstGroup = "first_value(" + vss.ValueNumberCol + ")" - lastGroup = "last_value(" + vss.ValueNumberCol + ")" + firstGroup = "argMin(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" + lastGroup = "argMax(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" ) // Aggregation functions for string signals. From fc4f7e4284c3a62d40b546a10e9f69c321b9063a Mon Sep 17 00:00:00 2001 From: Allyson-English Date: Mon, 8 Jul 2024 11:46:58 -0400 Subject: [PATCH 3/5] first and last string aggs --- internal/graph/model/models_gen.go | 6 ++ .../repositories/repositories_mocks_test.go | 4 +- internal/service/ch/ch_test.go | 60 +++++++++++++++++-- internal/service/ch/queries.go | 30 ++++++---- schema/base.graphqls | 8 +++ 5 files changed, 89 insertions(+), 19 deletions(-) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 143a1fb..fab97f7 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -260,12 +260,18 @@ const ( StringAggregationTop StringAggregation = "TOP" // Return a list of unique values in the group. StringAggregationUnique StringAggregation = "UNIQUE" + // Return value in group associated with the minimum time value. + StringAggregationFirst StringAggregation = "FIRST" + // Return value in group associated with the maximum time value. + StringAggregationLast StringAggregation = "LAST" ) var AllStringAggregation = []StringAggregation{ StringAggregationRand, StringAggregationTop, StringAggregationUnique, + StringAggregationFirst, + StringAggregationLast, } func (e StringAggregation) IsValid() bool { diff --git a/internal/repositories/repositories_mocks_test.go b/internal/repositories/repositories_mocks_test.go index 3b43433..cce0fcb 100644 --- a/internal/repositories/repositories_mocks_test.go +++ b/internal/repositories/repositories_mocks_test.go @@ -51,7 +51,7 @@ func (m *MockCHService) GetAggregatedSignals(ctx context.Context, aggArgs *model } // GetAggregatedSignals indicates an expected call of GetAggregatedSignals. -func (mr *MockCHServiceMockRecorder) GetAggregatedSignals(ctx, aggArgs any) *gomock.Call { +func (mr *MockCHServiceMockRecorder) GetAggregatedSignals(ctx, aggArgs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAggregatedSignals", reflect.TypeOf((*MockCHService)(nil).GetAggregatedSignals), ctx, aggArgs) } @@ -66,7 +66,7 @@ func (m *MockCHService) GetLatestSignals(ctx context.Context, latestArgs *model. } // GetLatestSignals indicates an expected call of GetLatestSignals. -func (mr *MockCHServiceMockRecorder) GetLatestSignals(ctx, latestArgs any) *gomock.Call { +func (mr *MockCHServiceMockRecorder) GetLatestSignals(ctx, latestArgs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestSignals", reflect.TypeOf((*MockCHService)(nil).GetLatestSignals), ctx, latestArgs) } diff --git a/internal/service/ch/ch_test.go b/internal/service/ch/ch_test.go index cbd64c8..6ed002f 100644 --- a/internal/service/ch/ch_test.go +++ b/internal/service/ch/ch_test.go @@ -191,7 +191,7 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { { Name: vss.FieldPowertrainType, Timestamp: c.dataStartTime, - ValueString: "value2,value1,value3", + ValueString: "value10,value3,value2,value9,value7,value5,value4,value8,value1,value6", Agg: model.StringAggregationUnique.String(), }, }, @@ -225,7 +225,7 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { }, }, { - name: "first", + name: "first float", aggArgs: model.AggregatedSignalArgs{ SignalArgs: model.SignalArgs{ TokenID: 1, @@ -250,7 +250,7 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { }, }, { - name: "last", + name: "last float", aggArgs: model.AggregatedSignalArgs{ SignalArgs: model.SignalArgs{ TokenID: 1, @@ -269,11 +269,61 @@ func (c *CHServiceTestSuite) TestGetAggSignal() { { Name: vss.FieldSpeed, Timestamp: c.dataStartTime, - ValueNumber: 9, + ValueNumber: dataPoints - 1, Agg: model.FloatAggregationLast.String(), }, }, }, + { + name: "first string", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + StringArgs: []model.StringSignalArgs{ + { + Name: vss.FieldPowertrainType, + Agg: model.StringAggregationFirst, + }, + }, + }, + expected: []model.AggSignal{ + { + Name: vss.FieldPowertrainType, + Timestamp: c.dataStartTime, + ValueString: "value1", + Agg: model.StringAggregationFirst.String(), + }, + }, + }, + { + name: "last string", + aggArgs: model.AggregatedSignalArgs{ + SignalArgs: model.SignalArgs{ + TokenID: 1, + }, + FromTS: c.dataStartTime, + ToTS: endTs, + Interval: day.Milliseconds(), + StringArgs: []model.StringSignalArgs{ + { + Name: vss.FieldPowertrainType, + Agg: model.StringAggregationLast, + }, + }, + }, + expected: []model.AggSignal{ + { + Name: vss.FieldPowertrainType, + Timestamp: c.dataStartTime, + ValueString: fmt.Sprintf("value%d", dataPoints), + Agg: model.StringAggregationLast.String(), + }, + }, + }, } for _, tc := range testCases { c.Run(tc.name, func() { @@ -423,7 +473,7 @@ func (c *CHServiceTestSuite) insertTestData() { Timestamp: c.dataStartTime.Add(time.Second * time.Duration(30*i)), Source: sources[i%3], TokenID: 1, - ValueString: fmt.Sprintf("value%d", i%3+1), + ValueString: fmt.Sprintf("value%d", i+1), } testSignal = append(testSignal, numSig, strSig) } diff --git a/internal/service/ch/queries.go b/internal/service/ch/queries.go index 721da46..33d0bbc 100644 --- a/internal/service/ch/queries.go +++ b/internal/service/ch/queries.go @@ -44,20 +44,22 @@ const ( // Aggregation functions for float signals. const ( - avgGroup = "avg(" + vss.ValueNumberCol + ")" - randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]" - minGroup = "min(" + vss.ValueNumberCol + ")" - maxGroup = "max(" + vss.ValueNumberCol + ")" - medGroup = "median(" + vss.ValueNumberCol + ")" - firstGroup = "argMin(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" - lastGroup = "argMax(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" + avgGroup = "avg(" + vss.ValueNumberCol + ")" + randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]" + minGroup = "min(" + vss.ValueNumberCol + ")" + maxGroup = "max(" + vss.ValueNumberCol + ")" + medGroup = "median(" + vss.ValueNumberCol + ")" + firstFloatGroup = "argMin(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" + lastFloatGroup = "argMax(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")" ) // Aggregation functions for string signals. const ( - randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]" - uniqueGroup = "arrayStringConcat(groupUniqArray(" + vss.ValueStringCol + "),',')" - topGroup = "arrayStringConcat(topK(1, 10)(" + vss.ValueStringCol + "))" + randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]" + uniqueGroup = "arrayStringConcat(groupUniqArray(" + vss.ValueStringCol + "),',')" + topGroup = "arrayStringConcat(topK(1, 10)(" + vss.ValueStringCol + "))" + firstStringGroup = "argMin(" + vss.ValueStringCol + ", " + vss.TimestampCol + ")" + lastStringGroup = "argMax(" + vss.ValueStringCol + ", " + vss.TimestampCol + ")" ) // TODO: remove this map when we move to storing the device address @@ -148,9 +150,9 @@ func getFloatAggFunc(aggType model.FloatAggregation) string { case model.FloatAggregationMed: aggStr = medGroup case model.FloatAggregationFirst: - aggStr = firstGroup + aggStr = firstFloatGroup case model.FloatAggregationLast: - aggStr = lastGroup + aggStr = lastFloatGroup } return aggStr } @@ -166,6 +168,10 @@ func getStringAgg(aggType model.StringAggregation) string { aggStr = uniqueGroup case model.StringAggregationTop: aggStr = topGroup + case model.StringAggregationFirst: + aggStr = firstStringGroup + case model.StringAggregationLast: + aggStr = lastStringGroup } return aggStr } diff --git a/schema/base.graphqls b/schema/base.graphqls index b545a71..2be1084 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -72,6 +72,14 @@ enum StringAggregation { Return a list of unique values in the group. """ UNIQUE + """ + Return value in group associated with the minimum time value. + """ + FIRST + """ + Return value in group associated with the maximum time value. + """ + LAST } type SignalFloat { """ From accc88ad9a0d79765c1986acd85f9b66723b0171 Mon Sep 17 00:00:00 2001 From: Allyson-English Date: Mon, 8 Jul 2024 12:20:42 -0400 Subject: [PATCH 4/5] generate gql --- internal/graph/generated.go | 8 ++++++++ internal/graph/model/models_gen.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 4d03d83..ec4f5d6 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -1184,6 +1184,14 @@ enum StringAggregation { Return a list of unique values in the group. """ UNIQUE + """ + Return value in group associated with the minimum time value. + """ + FIRST + """ + Return value in group associated with the maximum time value. + """ + LAST } type SignalFloat { """ diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index fab97f7..2a7b2c7 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -276,7 +276,7 @@ var AllStringAggregation = []StringAggregation{ func (e StringAggregation) IsValid() bool { switch e { - case StringAggregationRand, StringAggregationTop, StringAggregationUnique: + case StringAggregationRand, StringAggregationTop, StringAggregationUnique, StringAggregationFirst, StringAggregationLast: return true } return false From dd3ba72ae6894357ba0f97557a281382bfd1f75b Mon Sep 17 00:00:00 2001 From: Allyson-English Date: Mon, 8 Jul 2024 12:28:43 -0400 Subject: [PATCH 5/5] clean up --- internal/repositories/repositories_mocks_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/repositories/repositories_mocks_test.go b/internal/repositories/repositories_mocks_test.go index cce0fcb..3b43433 100644 --- a/internal/repositories/repositories_mocks_test.go +++ b/internal/repositories/repositories_mocks_test.go @@ -51,7 +51,7 @@ func (m *MockCHService) GetAggregatedSignals(ctx context.Context, aggArgs *model } // GetAggregatedSignals indicates an expected call of GetAggregatedSignals. -func (mr *MockCHServiceMockRecorder) GetAggregatedSignals(ctx, aggArgs interface{}) *gomock.Call { +func (mr *MockCHServiceMockRecorder) GetAggregatedSignals(ctx, aggArgs any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAggregatedSignals", reflect.TypeOf((*MockCHService)(nil).GetAggregatedSignals), ctx, aggArgs) } @@ -66,7 +66,7 @@ func (m *MockCHService) GetLatestSignals(ctx context.Context, latestArgs *model. } // GetLatestSignals indicates an expected call of GetLatestSignals. -func (mr *MockCHServiceMockRecorder) GetLatestSignals(ctx, latestArgs interface{}) *gomock.Call { +func (mr *MockCHServiceMockRecorder) GetLatestSignals(ctx, latestArgs any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestSignals", reflect.TypeOf((*MockCHService)(nil).GetLatestSignals), ctx, latestArgs) }