-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsubscriptions.go
180 lines (156 loc) · 6.21 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package nakadi
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/pkg/errors"
)
// AuthorizationAttribute represents a record for SubscriptionAuthorization which is used by the Nakadi high level API.
type AuthorizationAttribute struct {
DataType string `json:"data_type"`
Value string `json:"value"`
}
// SubscriptionAuthorization represents a subscription auth as used by the Nakadi high level API.
type SubscriptionAuthorization struct {
Admins []AuthorizationAttribute `json:"admins"`
Readers []AuthorizationAttribute `json:"readers"`
}
// Subscription represents a subscription as used by the Nakadi high level API.
type Subscription struct {
ID string `json:"id,omitempty"`
OwningApplication string `json:"owning_application"`
EventTypes []string `json:"event_types"`
ConsumerGroup string `json:"consumer_group,omitempty"`
ReadFrom string `json:"read_from,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
Authorization *SubscriptionAuthorization `json:"authorization,omitempty"`
}
// SubscriptionOptions is a set of optional parameters used to configure the SubscriptionAPI.
type SubscriptionOptions struct {
// Whether methods of the SubscriptionAPI retry when a request fails. If
// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
// no effect (default: false).
Retry bool
// The initial (minimal) retry interval used for the exponential backoff algorithm
// when retry is enables.
InitialRetryInterval time.Duration
// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
// this value the retry intervals remain constant.
MaxRetryInterval time.Duration
// MaxElapsedTime is the maximum time spent on retries when performing a request.
// Once this value was reached the exponential backoff is halted and the request will
// fail with an error.
MaxElapsedTime time.Duration
}
func (o *SubscriptionOptions) withDefaults() *SubscriptionOptions {
var copyOptions SubscriptionOptions
if o != nil {
copyOptions = *o
}
if copyOptions.InitialRetryInterval == 0 {
copyOptions.InitialRetryInterval = defaultInitialRetryInterval
}
if copyOptions.MaxRetryInterval == 0 {
copyOptions.MaxRetryInterval = defaultMaxRetryInterval
}
if copyOptions.MaxElapsedTime == 0 {
copyOptions.MaxElapsedTime = defaultMaxElapsedTime
}
return ©Options
}
// NewSubscriptionAPI crates a new instance of the SubscriptionAPI. As for all sub APIs of the `go-nakadi` package
// NewSubscriptionAPI receives a configured Nakadi client. The last parameter is a struct containing only optional \
// parameters. The options may be nil.
func NewSubscriptionAPI(client *Client, options *SubscriptionOptions) *SubscriptionAPI {
options = options.withDefaults()
return &SubscriptionAPI{
client: client,
backOffConf: backOffConfiguration{
Retry: options.Retry,
InitialRetryInterval: options.InitialRetryInterval,
MaxRetryInterval: options.MaxRetryInterval,
MaxElapsedTime: options.MaxElapsedTime}}
}
// SubscriptionAPI is a sub API that is used to manage subscriptions.
type SubscriptionAPI struct {
client *Client
backOffConf backOffConfiguration
}
// List returns all available subscriptions.
func (s *SubscriptionAPI) List() ([]*Subscription, error) {
subscriptions := struct {
Items []*Subscription `json:"items"`
}{}
err := s.client.httpGET(s.backOffConf.create(), s.subBaseURL(), &subscriptions, "unable to request subscriptions")
if err != nil {
return nil, err
}
return subscriptions.Items, nil
}
// Get obtains a single subscription identified by its ID.
func (s *SubscriptionAPI) Get(id string) (*Subscription, error) {
subscription := &Subscription{}
err := s.client.httpGET(s.backOffConf.create(), s.subURL(id), subscription, "unable to request subscription")
if err != nil {
return nil, err
}
return subscription, err
}
// Create initializes a new subscription. If the subscription already exists the pre-existing subscription
// is returned.
func (s *SubscriptionAPI) Create(subscription *Subscription) (*Subscription, error) {
const errMsg = "unable to create subscription"
response, err := s.client.httpPOST(s.backOffConf.create(), s.subBaseURL(), subscription, errMsg)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.Wrapf(err, "%s: unable to read response body", errMsg)
}
return nil, decodeResponseToError(buffer, errMsg)
}
subscription = &Subscription{}
err = json.NewDecoder(response.Body).Decode(subscription)
if err != nil {
return nil, errors.Wrapf(err, "%s: unable to decode response body", errMsg)
}
return subscription, nil
}
// Delete removes an existing subscription.
func (s *SubscriptionAPI) Delete(id string) error {
return s.client.httpDELETE(s.backOffConf.create(), s.subURL(id), "unable to delete subscription")
}
// SubscriptionStats represents detailed statistics for the subscription
type SubscriptionStats struct {
EventType string `json:"event_type"`
Partitions []*PartitionStats `json:"partitions"`
}
// PartitionStats represents statistic information for the particular partition
type PartitionStats struct {
Partition string `json:"partition"`
State string `json:"state"`
UnconsumedEvents int `json:"unconsumed_events"`
StreamID string `json:"stream_id"`
}
type statsResponse struct {
Items []*SubscriptionStats `json:"items"`
}
// GetStats returns statistic information for subscription
func (s *SubscriptionAPI) GetStats(id string) ([]*SubscriptionStats, error) {
stats := &statsResponse{}
if err := s.client.httpGET(s.backOffConf.create(), s.subURL(id)+"/stats", stats, "unable to get stats for subscription"); err != nil {
return nil, err
}
return stats.Items, nil
}
func (s *SubscriptionAPI) subURL(id string) string {
return fmt.Sprintf("%s/subscriptions/%s", s.client.nakadiURL, id)
}
func (s *SubscriptionAPI) subBaseURL() string {
return fmt.Sprintf("%s/subscriptions", s.client.nakadiURL)
}