-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgraphite.go
297 lines (262 loc) · 7.52 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package graphite
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/kisielk/og-rek"
"net"
"regexp"
"strings"
"time"
)
const (
nop = iota
plain
pickle
)
// Metric represents a graphite metric
// NOTE Value should be valid number types, e.g. int, int32, int64, float, etc.
// We do not check Value type, and just format with "%v" and send to graphite server.
type Metric struct {
Name string
Value interface{}
Timestamp int64
}
// Plain marshalls metric using plain text protocol
func (m *Metric) Plain(prefix string) string {
return fmt.Sprintf("%s%s %v %v\n", prefix, m.Name, m.Value, m.Timestamp)
}
// PlainB marshalls metric using plain text protocol, into byte slice
func (m *Metric) PlainB(prefix string) []byte {
return []byte(m.Plain(prefix))
}
// PickleB marshalls metric using pickle protocol, into byte slice
func (m *Metric) PickleB(prefix string) []byte {
// code taken from: https://github.com/graphite-ng/carbon-relay-ng/blob/master/destination/pickle.go
dataBuf := &bytes.Buffer{}
pickler := ogórek.NewEncoder(dataBuf)
// pickle format (in python talk): [(path, (timestamp, value)), ...]
point := []interface{}{string(m.Name), []interface{}{m.Timestamp, m.Value}}
list := []interface{}{point}
pickler.Encode(list)
messageBuf := &bytes.Buffer{}
binary.Write(messageBuf, binary.BigEndian, uint32(dataBuf.Len()))
messageBuf.Write(dataBuf.Bytes())
return messageBuf.Bytes()
}
// Client represents a graphite client
type Client struct {
prefix string // prefix metrics' name before send to server
network string // ip networks, should be "tcp", "udp"
address string // graphite server address, in format "$host:$port"
protocol int // plain or pickle
delay time.Duration
metricPool chan *Metric
conn net.Conn
shutdown bool
workerStopped chan struct{}
stopSendChans []chan struct{}
}
// SendMetric sends a metric to the server.
// SendMetric does not block the caller, it just enqueues the metric to a send buffer and return,
// metric is not guaranteed to be sent.
func (c *Client) SendMetric(metric *Metric) {
if c.protocol == nop || c.shutdown == true {
return
}
defer func() {
// edge case when panic would happen:
// 1. user called SendMetric(), the above c.shutdown check has passed, and this routine paused
// 2. user called Shutdown(), metricPool closed
// 3. this routine regain control, and tries to send the metric to metricPool, panic happens
recover()
}()
select {
case c.metricPool <- metric:
default: // channel full, silently drop, don't block caller
}
}
// SendMetrics sends a bunch of metrics to the server.
// SendMetrics does not block the caller, it just enqueues the metrics to a send buffer and return,
// metrics are not guaranteed to be sent.
func (c *Client) SendMetrics(metrics []*Metric) {
for _, metric := range metrics {
c.SendMetric(metric)
}
}
// SendSimple does not require the caller to provide a Metric object.
// SendSimple does not block the caller, it just enqueues the metric to a send buffer and return,
// metric is not guaranteed to be sent.
// if timestamp is 0, time.Now() is used
func (c *Client) SendSimple(name string, value interface{}, timestamp int64) {
if timestamp == 0 {
timestamp = time.Now().Unix()
}
c.SendMetric(&Metric{
Name: name,
Value: value,
Timestamp: timestamp,
})
}
// SendChan receives metrics from chan and send continually.
// SendChan will block the caller until the chan is closed.
func (c *Client) SendChan(ch chan *Metric) {
for {
if metric, ok := <-ch; ok {
c.SendMetric(metric)
} else {
break
}
}
}
// Shutdown closes the client.
// After shutdown, the client won't accept any metrics to send,
// Shutdown will block the caller until all metrics in buffer have been sent, or timeout occurs.
func (c *Client) Shutdown(timeout time.Duration) {
// set the shutdown mark
c.shutdown = true
close(c.metricPool)
// wait worker routine to exit or timeout
select {
case <-c.workerStopped:
// shutdown normally
return
case <-time.After(timeout):
// timeout happens, forcely close connection and return
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
return
}
}
// cleanPrefix cleans up caller passed prefix, removes leading and trailing white spaces and dots
func cleanPrefix(prefix string) string {
// TODO check for invalid characters
return strings.Trim(prefix, " ")
}
func findStringSubmatchMap(r *regexp.Regexp, s string) map[string]string {
captures := make(map[string]string)
match := r.FindStringSubmatch(s)
if match == nil {
return captures
}
for i, name := range r.SubexpNames() {
if i == 0 || name == "" {
continue
}
captures[name] = match[i]
}
return captures
}
// NewClient creates a graphite client. If prefix is specified, all metrics' name will be
// prefixed before sending to graphite.
func NewClient(url string, prefix string, reconnectDelay time.Duration) (*Client, error) {
// parse url, formats: ${protocol}://${host}:${port}
// ${protocol} is optional, valid values: tcp, udp, pickle (via tcp), default to tcp
// ${host} is mandatary
// ${port} is optional, default to 2003
captures := findStringSubmatchMap(
regexp.MustCompile(`^((?P<n>(tcp|udp|pickle))://)?(?P<h>[^:]+)(:(?P<p>\d+))?$`),
url,
)
var net, host, port = captures["n"], captures["h"], captures["p"]
if host == "" {
return nil, errors.New("Invalid graphite url: " + url)
}
if port == "" {
port = "2003"
}
var network string
var protocol int
switch net {
case "tcp":
network, protocol = "tcp", plain
case "udp":
network, protocol = "udp", plain
case "pickle":
network, protocol = "tcp", pickle
}
client := &Client{
prefix: cleanPrefix(prefix),
network: network,
address: fmt.Sprintf("%s:%s", host, port),
protocol: protocol,
delay: reconnectDelay,
metricPool: make(chan *Metric, 10000),
conn: nil,
shutdown: false,
workerStopped: make(chan struct{}, 1),
}
go client.worker()
return client, nil
}
// NewNopClient creates a graphite client that does nothing
func NewNopClient() (*Client, error) {
return &Client{
protocol: nop,
shutdown: false,
}, nil
}
// reconnect establish connection to server, blocks the caller until success or client shutdown
func (c *Client) reconnect() {
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
for {
// if client was shutdown, and connection is not established, we just give up
if c.shutdown {
return
}
var err error
if c.conn, err = net.Dial(c.network, c.address); err == nil {
return
}
// connection failed, wait a minute and retry
time.Sleep(c.delay)
}
}
// worker pulls data from metricPool, and send them to graphite server
func (c *Client) worker() {
defer func() {
close(c.workerStopped)
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}()
for {
if m, ok := <-c.metricPool; ok {
if c.conn == nil {
// no connection established, or last write failed
c.reconnect()
}
if c.conn == nil {
// reconnect() should block until successfully connected,
// unless the client is shutdown, in which case we should give up
return
}
switch c.protocol {
case plain:
if _, err := c.conn.Write(m.PlainB(c.prefix)); err != nil {
// if write fails, don't resend this metric
// we do not guarantee all metrics are sent
c.conn.Close()
c.conn = nil
}
case pickle:
if _, err := c.conn.Write(m.PickleB(c.prefix)); err != nil {
c.conn.Close()
c.conn = nil
}
default:
// wrong protocol
}
} else {
// !ok means metricPool was closed, so was client shutdown
return
}
}
}