Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow by period uniqueness to be based off scheduled time #734

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Snoozing a job now causes its `attempt` to be _decremented_, whereas previously the `max_attempts` would be incremented. In either case, this avoids allowing a snooze to exhaust a job's retries; however the new behavior also avoids potential issues with wrapping the `max_attempts` value, and makes it simpler to implement a `RetryPolicy` based on either `attempt` or `max_attempts`. The number of snoozes is also tracked in the job's metadata as `snoozes` for debugging purposes.

The implementation of the builtin `RetryPolicy` implementations is not changed, so this change should not cause any user-facing breakage unless you're relying on `attempt - len(errors)` for some reason. [PR #730](https://github.com/riverqueue/river/pull/730).
- `ByPeriod` uniqueness is now based off a job's `ScheduledAt` instead of the current time if it has a value. [PR #734](https://github.com/riverqueue/river/pull/734).

## [0.15.0] - 2024-12-26

Expand Down
3 changes: 2 additions & 1 deletion internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/tidwall/sjson"

"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -123,7 +124,7 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := timeGen.NowUTC().Truncate(uniqueOpts.ByPeriod)
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.NowUTC).Truncate(uniqueOpts.ByPeriod)
sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339))
}

Expand Down
36 changes: 29 additions & 7 deletions internal/dbunique/db_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)

Expand All @@ -30,10 +31,11 @@ func TestUniqueKey(t *testing.T) {
stubSvc.StubNowUTC(now)

tests := []struct {
name string
argsFunc func() rivertype.JobArgs
uniqueOpts UniqueOpts
expectedJSON string
name string
argsFunc func() rivertype.JobArgs
modifyInsertParamsFunc func(insertParams *rivertype.JobInsertParams)
uniqueOpts UniqueOpts
expectedJSON string
}{
{
name: "ByArgsWithMultipleUniqueStructTagsAndDefaultStates",
Expand Down Expand Up @@ -165,6 +167,22 @@ func TestUniqueKey(t *testing.T) {
uniqueOpts: UniqueOpts{ByPeriod: time.Hour, ByState: []rivertype.JobState{rivertype.JobStateCompleted}},
expectedJSON: "&kind=worker_4&period=" + now.Truncate(time.Hour).Format(time.RFC3339),
},
{
name: "PeriodFromScheduledAt",
argsFunc: func() rivertype.JobArgs {
type TaskJobArgs struct {
JobArgsStaticKind
}
return TaskJobArgs{
JobArgsStaticKind: JobArgsStaticKind{kind: "worker_4"},
}
},
modifyInsertParamsFunc: func(insertParams *rivertype.JobInsertParams) {
insertParams.ScheduledAt = ptrutil.Ptr(now.Add(time.Hour))
},
uniqueOpts: UniqueOpts{ByPeriod: time.Hour},
expectedJSON: "&kind=worker_4&period=" + now.Add(time.Hour).Truncate(time.Hour).Format(time.RFC3339),
},
{
name: "ExcludeKindByArgs",
argsFunc: func() rivertype.JobArgs {
Expand Down Expand Up @@ -228,7 +246,7 @@ func TestUniqueKey(t *testing.T) {
states = tt.uniqueOpts.ByState
}

jobParams := &rivertype.JobInsertParams{
insertParams := &rivertype.JobInsertParams{
Args: args,
CreatedAt: &now,
EncodedArgs: encodedArgs,
Expand All @@ -241,12 +259,16 @@ func TestUniqueKey(t *testing.T) {
UniqueStates: UniqueStatesToBitmask(states),
}

uniqueKeyPreHash, err := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, jobParams)
if tt.modifyInsertParamsFunc != nil {
tt.modifyInsertParamsFunc(insertParams)
}

uniqueKeyPreHash, err := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, insertParams)
require.NoError(t, err)
require.Equal(t, tt.expectedJSON, uniqueKeyPreHash)
expectedHash := sha256.Sum256([]byte(tt.expectedJSON))

uniqueKey, err := UniqueKey(stubSvc, &tt.uniqueOpts, jobParams)
uniqueKey, err := UniqueKey(stubSvc, &tt.uniqueOpts, insertParams)
require.NoError(t, err)
require.NotNil(t, uniqueKey)

Expand Down
Loading