Skip to content

Commit

Permalink
Merge pull request #440 from Feggah/feature/message-retention-duration
Browse files Browse the repository at this point in the history
Add messageRetentionDuration to PubSub Topic
  • Loading branch information
Feggah authored Oct 10, 2022
2 parents d85e3cc + 1ea920f commit 8d58196
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 5 deletions.
17 changes: 17 additions & 0 deletions apis/pubsub/v1alpha1/topic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ type TopicParameters struct {
// +optional
MessageStoragePolicy *MessageStoragePolicy `json:"messageStoragePolicy,omitempty"`

// MessageRetentionDuration: Indicates the minimum duration to retain a
// message after it is published to the topic. If this field is set,
// messages published to the topic in the last
// `message_retention_duration` are always available to subscribers. For
// instance, it allows any attached subscription to seek to a timestamp
// (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `message_retention_duration` in the past. If this field
// is not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 31 days or less than 10 minutes.
//
// The duration must be in seconds, terminated by 's'. Example: "1200s".
// Avoid using fractional digits.
//
// +kubebuilder:validation:Pattern=[0-9]+s$
// +optional
MessageRetentionDuration *string `json:"messageRetentionDuration,omitempty"`

// KmsKeyName is the resource name of the Cloud KMS CryptoKey to be used to
// protect access to messages published on this topic.
//
Expand Down
5 changes: 5 additions & 0 deletions apis/pubsub/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions package/crds/pubsub.gcp.crossplane.io_topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ spec:
type: string
description: Labels are used as additional metadata on Topic.
type: object
messageRetentionDuration:
description: "MessageRetentionDuration: Indicates the minimum
duration to retain a message after it is published to the topic.
If this field is set, messages published to the topic in the
last `message_retention_duration` are always available to subscribers.
For instance, it allows any attached subscription to seek to
a timestamp (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
that is up to `message_retention_duration` in the past. If this
field is not set, message retention is controlled by settings
on individual subscriptions. Cannot be more than 31 days or
less than 10 minutes. \n The duration must be in seconds, terminated
by 's'. Example: \"1200s\". Avoid using fractional digits."
pattern: '[0-9]+s$'
type: string
messageStoragePolicy:
description: MessageStoragePolicy is the policy constraining the
set of Google Cloud Platform regions where messages published
Expand Down
35 changes: 34 additions & 1 deletion pkg/clients/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package topic
import (
"fmt"
"strings"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
pubsub "google.golang.org/api/pubsub/v1"

"github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1"
Expand Down Expand Up @@ -48,6 +50,9 @@ func GenerateTopic(name string, s v1alpha1.TopicParameters) *pubsub.Topic {
AllowedPersistenceRegions: s.MessageStoragePolicy.AllowedPersistenceRegions,
}
}
if s.MessageRetentionDuration != nil {
t.MessageRetentionDuration = gcp.StringValue(s.MessageRetentionDuration)
}
return t
}

Expand All @@ -66,13 +71,35 @@ func LateInitialize(s *v1alpha1.TopicParameters, t pubsub.Topic) {
if s.MessageStoragePolicy == nil && t.MessageStoragePolicy != nil {
s.MessageStoragePolicy = &v1alpha1.MessageStoragePolicy{AllowedPersistenceRegions: t.MessageStoragePolicy.AllowedPersistenceRegions}
}
if s.MessageRetentionDuration == nil && len(t.MessageRetentionDuration) != 0 {
s.MessageRetentionDuration = gcp.StringPtr(t.MessageRetentionDuration)
}
}

// IsUpToDate checks whether Topic is configured with given TopicParameters.
func IsUpToDate(s v1alpha1.TopicParameters, t pubsub.Topic) bool {
observed := &v1alpha1.TopicParameters{}
LateInitialize(observed, t)
return cmp.Equal(observed, &s)

observedDuration := convertDuration(observed.MessageRetentionDuration)
sDuration := convertDuration(s.MessageRetentionDuration)
if observedDuration != sDuration {
return false
}

return cmp.Equal(observed, &s, cmpopts.IgnoreFields(v1alpha1.TopicParameters{}, "MessageRetentionDuration"))
}

func convertDuration(duration *string) time.Duration {
if duration == nil {
return 0
}

// From here we know that "duration" has a valid duration string
// format because of the kubebuilder Pattern validator, so we can
// ignore time.ParseDuration errors
d, _ := time.ParseDuration(*duration)
return d
}

// GenerateUpdateRequest produces an UpdateTopicRequest with the difference
Expand All @@ -92,6 +119,12 @@ func GenerateUpdateRequest(name string, s v1alpha1.TopicParameters, t pubsub.Top
}
}
}
if !cmp.Equal(s.MessageRetentionDuration, observed.MessageRetentionDuration) {
mask = append(mask, "messageRetentionDuration")
if s.MessageRetentionDuration != nil {
ut.Topic.MessageRetentionDuration = gcp.StringValue(s.MessageRetentionDuration)
}
}
if !cmp.Equal(s.Labels, observed.Labels) {
mask = append(mask, "labels")
ut.Topic.Labels = s.Labels
Expand Down
14 changes: 10 additions & 4 deletions pkg/clients/topic/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func params() *v1alpha1.TopicParameters {
MessageStoragePolicy: &v1alpha1.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"bar", "foo"},
},
KmsKeyName: gcp.StringPtr("mykms"),
KmsKeyName: gcp.StringPtr("mykms"),
MessageRetentionDuration: gcp.StringPtr("600s"),
}
}

Expand All @@ -52,7 +53,8 @@ func topic() *pubsub.Topic {
MessageStoragePolicy: &pubsub.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"bar", "foo"},
},
KmsKeyName: "mykms",
KmsKeyName: "mykms",
MessageRetentionDuration: "600s",
}
}

Expand Down Expand Up @@ -121,6 +123,10 @@ func TestIsUpToDate(t *testing.T) {
obs pubsub.Topic
param v1alpha1.TopicParameters
}

upToDateTopic := topic()
upToDateTopic.MessageRetentionDuration = "600.00s"

cases := map[string]struct {
args
result bool
Expand All @@ -136,7 +142,7 @@ func TestIsUpToDate(t *testing.T) {
},
"UpToDate": {
args: args{
obs: *topic(),
obs: *upToDateTopic,
param: *params(),
},
result: true,
Expand Down Expand Up @@ -175,7 +181,7 @@ func TestGenerateUpdateRequest(t *testing.T) {
},
result: &pubsub.UpdateTopicRequest{
Topic: withoutKMS,
UpdateMask: "messageStoragePolicy,labels",
UpdateMask: "messageStoragePolicy,messageRetentionDuration,labels",
},
},
}
Expand Down

0 comments on commit 8d58196

Please sign in to comment.