Skip to content

Commit

Permalink
Merge pull request #108 from DIMO-Network/ex-3247-improve-aggregation…
Browse files Browse the repository at this point in the history
…-timestamps-for-telemetry-api

Ex 3247 improve aggregation timestamps for telemetry api
  • Loading branch information
KevinJoiner authored Jan 17, 2025
2 parents a8a4e48 + 3202801 commit 589bd35
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
24 changes: 13 additions & 11 deletions e2e/aproximate_location_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e_test

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -127,52 +128,53 @@ func TestApproximateLocation(t *testing.T) {
assert.Equal(t, expectedEndLatLong.Lng, result.SignalLatest.ApproxLong.Value)
assert.Nil(t, result.SignalLatest.Lat)
assert.Nil(t, result.SignalLatest.Long)

query = `query {
signals(tokenId:39718, from: "2020-04-15T09:21:19Z", to: "2025-04-27T09:21:19Z", interval:"24h"){
fromTime := "2024-11-19T09:21:19Z"
fromtTimePlus24 := "2024-11-20T09:21:19Z"
query = fmt.Sprintf(`query {
signals(tokenId:39718, from: "%s", to: "2025-04-27T09:21:19Z", interval:"24h"){
timestamp
currentLocationApproximateLatitude(agg: FIRST)
currentLocationApproximateLongitude(agg: FIRST)
}
}`
}`, fromTime)
// Execute request
aggResult := ApproxAgg{}
err = telemetryClient.Post(query, &aggResult, WithToken(token))
require.NoError(t, err)

require.Len(t, aggResult.Signals, 2)
// Assert the results
assert.Equal(t, locationTime.Add(-time.Hour*24).Truncate(time.Hour*24).Format(time.RFC3339), aggResult.Signals[0].Timestamp)
assert.Equal(t, fromTime, aggResult.Signals[0].Timestamp)
assert.Equal(t, expectedStartLatLong.Lat, *aggResult.Signals[0].ApproxLat)
assert.Equal(t, expectedStartLatLong.Lng, *aggResult.Signals[0].ApproxLong)

assert.Equal(t, locationTime.Truncate(time.Hour*24).Format(time.RFC3339), aggResult.Signals[1].Timestamp)
assert.Equal(t, fromtTimePlus24, aggResult.Signals[1].Timestamp)
assert.Equal(t, expectedEndLatLong.Lat, *aggResult.Signals[1].ApproxLat)
assert.Equal(t, expectedEndLatLong.Lng, *aggResult.Signals[1].ApproxLong)

query = `query {
signals(tokenId:39718, from: "2020-04-15T09:21:19Z", to: "2025-04-27T09:21:19Z", interval:"24h"){
query = fmt.Sprintf(`query {
signals(tokenId:39718, from: "%s", to: "2025-04-27T09:21:19Z", interval:"24h"){
timestamp
currentLocationApproximateLatitude(agg: FIRST)
currentLocationApproximateLongitude(agg: FIRST)
currentLocationLatitude(agg: FIRST)
currentLocationLongitude(agg: FIRST)
}
}`
}`, fromTime)
// Execute request
aggResult = ApproxAgg{}
err = telemetryClient.Post(query, &aggResult, WithToken(token))
require.Error(t, err)

// Assert the results
require.Len(t, aggResult.Signals, 2)
assert.Equal(t, locationTime.Add(-time.Hour*24).Truncate(time.Hour*24).Format(time.RFC3339), aggResult.Signals[0].Timestamp)
assert.Equal(t, fromTime, aggResult.Signals[0].Timestamp)
assert.Equal(t, expectedStartLatLong.Lat, *aggResult.Signals[0].ApproxLat)
assert.Equal(t, expectedStartLatLong.Lng, *aggResult.Signals[0].ApproxLong)
assert.Nil(t, aggResult.Signals[0].Lat)
assert.Nil(t, aggResult.Signals[0].Long)

assert.Equal(t, locationTime.Truncate(time.Hour*24).Format(time.RFC3339), aggResult.Signals[1].Timestamp)
assert.Equal(t, fromtTimePlus24, aggResult.Signals[1].Timestamp)
assert.Equal(t, expectedEndLatLong.Lat, *aggResult.Signals[1].ApproxLat)
assert.Equal(t, expectedEndLatLong.Lng, *aggResult.Signals[1].ApproxLong)
assert.Nil(t, aggResult.Signals[1].Lat)
Expand Down
4 changes: 3 additions & 1 deletion e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
testServices *TestServices
once sync.Once
cleanupOnce sync.Once
srvcLock sync.Mutex
cleanup func()
)

Expand All @@ -43,7 +44,7 @@ func TestMain(m *testing.M) {
// GetTestServices returns singleton instances of all test services
func GetTestServices(t *testing.T) *TestServices {
t.Helper()

srvcLock.Lock()
once.Do(func() {
settings := config.Settings{
Port: 8080,
Expand Down Expand Up @@ -92,6 +93,7 @@ func GetTestServices(t *testing.T) *TestServices {
})
}
})
srvcLock.Unlock()
return testServices
}

Expand Down
65 changes: 65 additions & 0 deletions internal/service/ch/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,71 @@ func (c *CHServiceTestSuite) TestExecutionTimeout() {
c.Require().True(errors.Is(err, context.DeadlineExceeded), "Expected error to be DeadlineExceeded, got %v", err)
}

func (c *CHServiceTestSuite) TestOrginGrouping() {
ctx := context.Background()
conn, err := c.container.GetClickHouseAsConn()
c.Require().NoError(err, "Failed to get clickhouse connection")

// Set up test data for February 2024
startTime := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2024, 2, 28, 23, 59, 59, 0, time.UTC)

// Create test signals - one per day in February
var signals []vss.Signal
currentTime := startTime
for currentTime.Before(endTime) {
signal := vss.Signal{
Name: vss.FieldSpeed,
Timestamp: currentTime,
Source: "test/origin",
TokenID: 100,
ValueNumber: 100.0,
}
signals = append(signals, signal)
currentTime = currentTime.Add(24 * time.Hour)
}

// Insert signals
batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", vss.TableName))
c.Require().NoError(err, "Failed to prepare batch")

for _, sig := range signals {
err := batch.AppendStruct(&sig)
c.Require().NoError(err, "Failed to append struct")
}
err = batch.Send()
c.Require().NoError(err, "Failed to send batch")

// Create aggregation query args
aggArgs := &model.AggregatedSignalArgs{
SignalArgs: model.SignalArgs{
TokenID: 100,
},
FromTS: startTime,
ToTS: endTime,
Interval: 28 * day.Milliseconds(),
FloatArgs: map[model.FloatSignalArgs]struct{}{
{
Name: vss.FieldSpeed,
Agg: model.FloatAggregationAvg,
}: {},
},
}

// Query signals
result, err := c.chService.GetAggregatedSignals(ctx, aggArgs)
c.Require().NoError(err, "Failed to get aggregated signals")

// We expect exactly one group since we're using a 30-day interval
c.Require().Len(result, 1, "Expected exactly one group")

// Verify the group's timestamp matches the start time
c.Require().Equal(startTime, result[0].Timestamp, "Group timestamp should match start time")

// Verify the average value (should be 100.0 since all values are 100.0)
c.Require().Equal(100.0, result[0].ValueNumber, "Unexpected average value")
}

// insertTestData inserts test data into the clickhouse database.
// it loops for 10 iterations and inserts a 2 signals with each iteration that have a value of i and a powertrain type of "value"+ n%3+1
// The source is selected from a list of sources in a round robin fashion of sources[i%3].
Expand Down
22 changes: 18 additions & 4 deletions internal/service/ch/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,23 @@ func withSource(source string) qm.QueryMod {
}

// selectInterval adds a SELECT clause to the query to select the interval group based on the given milliSeconds.
// Example: 'SELECT toStartOfInterval(Timestamp, toIntervalMillisecond(?)) as group_timestamp'.
func selectInterval(milliSeconds int64) qm.QueryMod {
return qm.Select(fmt.Sprintf("toStartOfInterval(%s, toIntervalMillisecond(%d)) as %s", vss.TimestampCol, milliSeconds, IntervalGroup))
// Normalize timestamps relative to a specific origin point (by subtracting it)
// Round to interval boundaries using toStartOfInterval
// Restore the original time reference (by adding the origin back).
func selectInterval(milliSeconds int64, origin time.Time) qm.QueryMod {
// TODO (Kevin): Replace this function with simpler toStartOfInterval once ClickHouse prod server is >= v24.9
// return qm.Select(fmt.Sprintf("toStartOfInterval(%s, toIntervalMillisecond(%d), fromUnixTimestamp64Micro(%d)) as %s", vss.TimestampCol, milliSeconds, origin.UnixMicro(), intervalGroup))
// https://github.com/ClickHouse/ClickHouse/commit/2c35d53bf67cd80edb4389feac11bcff67233eeb
return qm.Select(fmt.Sprintf(`
fromUnixTimestamp64Micro(
toUnixTimestamp64Micro(
toStartOfInterval(
fromUnixTimestamp64Micro(toUnixTimestamp64Micro(%s) - %d),
toIntervalMillisecond(%d)
)
) + %d
) as %s`,
vss.TimestampCol, origin.UnixMicro(), milliSeconds, origin.UnixMicro(), IntervalGroup))
}

func selectNumberAggs(numberAggs []model.FloatSignalArgs) qm.QueryMod {
Expand Down Expand Up @@ -330,7 +344,7 @@ func getAggQuery(aggArgs *model.AggregatedSignalArgs) (string, []any, error) {
mods := []qm.QueryMod{
qm.Select(vss.NameCol),
qm.Select(AggCol),
selectInterval(aggArgs.Interval),
selectInterval(aggArgs.Interval, aggArgs.FromTS),
selectNumberAggs(floatArgs),
selectStringAggs(stringArgs),
qm.Where(tokenIDWhere, aggArgs.TokenID),
Expand Down

0 comments on commit 589bd35

Please sign in to comment.