Skip to content

Commit

Permalink
Metrics: Producer metric overhaul + grafana dashboard (#90)
Browse files Browse the repository at this point in the history
* publisher: kinesis: add metric for hitting throughput limits

* publisher: kinesis: refactor metrics reporting

* publisher: kinesis: add metrics for stream creation limit exceeded

* chore: kinesis: make internal names consistent

* publisher: pubsub: add metric for hitting throughput limits

* publisher: pubsub: add metric for exceeding topic creation limits

* kinesis: document bug with reporting throughput errors

* publisher: kafka: remove false positives from delivery metrics

* publisher: kafka: fix tests

* publisher: pubsub: remove false positives from delivery metrics

* publisher: kinesis: remove false positives from delivery report

* publisher: emit non-delivery metrics on topic/stream existence errors

* chore: remove redundant imports

* misc: use better var names

* publishers: add topic name to *_messages_delivered_total metric

* publisher: kinesis: make partition-key fixed length

* chore: remove typo in comments

* metrics: update metric metadata

* metrics: add topic/stream label for failed deliveries

* docs: update delivery metrics with topic/stream name

* docs: add ${PUBLISHER}_producebulk_tt_ms metric documentation

* misc: add dashboard with support for prometheus data source

* docs: add info about grafana dashboard

* chore: make dashboard portable

* misc: rename dashboard to grafana.json for better semantics
  • Loading branch information
turtleDev authored Jul 31, 2024
1 parent 963fced commit d8a60ef
Show file tree
Hide file tree
Showing 12 changed files with 4,409 additions and 181 deletions.
4,067 changes: 4,067 additions & 0 deletions dashboards/grafana.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions docs/docs/guides/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,11 @@ Raccoon provides fine-grained metrics that denote latency. That gives clues as t
- [`event_processing_duration_milliseconds`](reference/metrics.md#event_processing_duration_milliseconds) This metrics denotes overall latency. You need to look at other latency metrics to find the root cause when this metric is high.
- [`server_processing_latency_milliseconds`](reference/metrics.md#server_processing_latency_milliseconds) Correlate this metric with `event_processing_duration_milliseconds` to infer whether the issue is with Raccoon itself, or something wrong with the network, or the way [sent_time](https://github.com/raystack/proton/blob/main/raystack/raccoon/v1beta1/raccoon.proto#L47) is generated.-
- [`worker_processing_duration_milliseconds`](reference/metrics.md#worker_processing_duration_milliseconds) High value of this metric indicates that the publisher is slow or can't keep up.


### Dashboard

There is a pre-built [grafana dashboard](https://github.com/raystack/raccoon/tree/dashboards/raccoon.json) available with support for Prometheus data source.

If you're running the statsd + telegraf setup, you can configure telegraf to push metrics to Prometheus.

73 changes: 56 additions & 17 deletions docs/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ This page contains the reference for all the metrics exposed by Raccoon.
- [Kafka](#kafka)
- [PubSub](#pubsub)
- [Kinesis](#kinesis)
- [PubSub Publisher](metrics.md#pubsub-publisher)
- [Kinesis Publisher](metrics.md#kinesis-publisher)
- [Resource Usage](metrics.md#resource-usage)
- [Event Delivery](metrics.md#event-delivery)

## Server Connection
Expand Down Expand Up @@ -74,17 +71,17 @@ Number of connection close errors encountered
### Kafka
#### `kafka_messages_delivered_total`

Number of delivered events to Kafka. The metric also contains false increments. To find the true value, one should use the difference between this and `kafka_messages_undelivered_total` metric for the same tag/labels.
Number of events delivered to Kafka.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
- Tags: `topic=topicname` `conn_group=*` `event_type=*`

#### `kafka_messages_undelivered_total`

The count of false increments done by `kafka_messages_delivered_total`. To be used in conjunction with the former for accurate metrics.
Number of events not delivered to Kafka.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
- Tags: `topic=topicname` `conn_group=*` `event_type=*`


#### `kafka_unknown_topic_failure_total`
Expand Down Expand Up @@ -153,50 +150,92 @@ Response time of produce batch method of the kafka producer

#### `pubsub_messages_delivered_total`

Number of delivered events to PubSub. The metric also contains false increments. To find the true value, one should use the difference between this and `pubsub_messages_undelivered_total` metric for the same tag/labels.
Number of events delivered to PubSub.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
- Tags: `topic=topicname` `conn_group=*` `event_type=*`

#### `pubsub_messages_undelivered_total`

The count of false increments done by `pubsub_messages_delivered_total`. To be used in conjunction with the former for accurate metrics.
Number of events that were not delivered to PubSub.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
- Tags: `topic=topicname` `conn_group=*` `event_type=*`


#### `pubsub_unknown_topic_failure_total`

Number of delivery failure caused by topic does not exist in PubSub.
Number of delivery failures caused by non-existence of topic in PubSub.

- Type: `Count`
- Tags: `topic=topicname` `event_type=*` `conn_group=*`

#### `pubsub_topic_throughput_exceeded_total`

Number of delivery failures caused by exceeding throughput limits on PubSub.

- Type: `Count`
- Tags: `topic=topicname` `event_type=*` `conn_group=*`

#### `pubsub_topics_limit_exceeded_total`

Number of delivery failures caused by exceeding the limit on number of Topics on PubSub.

- Type: `Count`
- Tags: `topic=topicname` `event_type=*` `conn_group=*`

#### `pubsub_producebulk_tt_ms`

Response time of produce batch method of the pubsub producer

- Type `Timing`
- Tags: NA

### Kinesis

#### `kinesis_messages_delivered_total`

Number of delivered events to Kinesis. The metric also contains false increments. To find the true value, one should use the difference between this and `kinesis_messages_undelivered_total` metric for the same tag/labels.
Number of events successfully delivered to Kinesis.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
- Tags: `stream=streamname` `conn_group=*` `event_type=*`

#### `kinesis_messages_undelivered_total`

The count of false increments done by `kinesis_messages_delivered_total`. To be used in conjunction with the former for accurate metrics.
Number of events not delivered to Kinesis.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
- Tags: `stream=streamname` `conn_group=*` `event_type=*`


#### `kinesis_unknown_stream_failure_total`

Number of delivery failure caused by stream does not exist in Kinesis.
Number of delivery failures caused by non-existence of stream in Kinesis.

- Type: `Count`
- Tags: `stream=streamname` `event_type=*` `conn_group=*`

#### `kinesis_stream_throughput_exceeded_total`

Number of delivery failures caused by exceeding shard throughput limits. This error can also occur if the message size of an event exceeds message size limit (1MiB as of the day of this writing). See [Limits and Quotas on Kinesis](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)

- Type: `Count`
- Tags: `stream=streamname` `event_type=*` `conn_group=*`

#### `kinesis_streams_limit_exceeded_total`

Number of delivery failures caused due to too many streams in `CREATING` mode. AWS Kinesis limits how many stream creation requests can be submitted in parallel to 5.

- Type: `Count`
- Tags: `stream=streamname` `event_type=*` `conn_group=*`

#### `kinesis_producebulk_tt_ms`

Response time of produce batch method of the kinesis producer

- Type `Timing`
- Tags: NA

## Resource Usage

### `server_mem_gc_triggered_current`
Expand Down
10 changes: 8 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ var url, wsurl string
var bootstrapServers string
var grpcServerAddr string

const envTestHost = "INTEGTEST_HOST"

func TestMain(m *testing.M) {
uuid = fmt.Sprintf("%d-test", rand.Int())
timeout = 20 * time.Second
topicFormat = os.Getenv("INTEGTEST_TOPIC_FORMAT")
wsurl = fmt.Sprintf("ws://%v/api/v1/events", os.Getenv("INTEGTEST_HOST"))
url = fmt.Sprintf("http://%v/api/v1/events", os.Getenv("INTEGTEST_HOST"))
wsurl = fmt.Sprintf("ws://%v/api/v1/events", os.Getenv(envTestHost))
url = fmt.Sprintf("http://%v/api/v1/events", os.Getenv(envTestHost))
grpcServerAddr = os.Getenv("GRPC_SERVER_ADDR")
bootstrapServers = os.Getenv("INTEGTEST_BOOTSTRAP_SERVER")
os.Exit(m.Run())
}

func TestIntegration(t *testing.T) {
if os.Getenv(envTestHost) == "" {
t.Errorf("cannot run tests because %s env variable is not set", envTestHost)
return
}
var err error
assert.NoError(t, err)
header := http.Header{
Expand Down
24 changes: 18 additions & 6 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,34 @@ func getCounterMap() map[string]CounterVec {

counters["kafka_messages_delivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_messages_delivered_total",
Help: "Number of delivered events to Kafka"}, []string{"success", "conn_group", "event_type"})
Help: "Number of delivered events to Kafka"}, []string{"topic", "conn_group", "event_type"})
counters["kafka_messages_undelivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_messages_undelivered_total",
Help: "Number of delivered events to Kafka which failed while reading delivery report"}, []string{"success", "conn_group", "event_type"})
Help: "Number of events that failed delivery"}, []string{"topic", "conn_group", "event_type"})
counters["pubsub_messages_delivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pubsub_messages_delivered_total",
Help: "Number of delivered events to Kafka"}, []string{"success", "conn_group", "event_type"})
Help: "Number of delivered events to PubSub"}, []string{"topic", "conn_group", "event_type"})
counters["pubsub_messages_undelivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pubsub_messages_undelivered_total",
Help: "Number of delivered events to PubSub which failed while reading delivery report"}, []string{"success", "conn_group", "event_type"})
Help: "Number of events that failed delivery"}, []string{"topic", "conn_group", "event_type"})
counters["kinesis_messages_delivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kinesis_messages_delivered_total",
Help: "Number of delivered events to Kafka"}, []string{"success", "conn_group", "event_type"})
Help: "Number of delivered events to Kinesis"}, []string{"stream", "conn_group", "event_type"})
counters["kinesis_messages_undelivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kinesis_messages_undelivered_total",
Help: "Number of delivered events to kinesis which failed while reading delivery report"}, []string{"success", "conn_group", "event_type"})
Help: "Number of events that failed delivery"}, []string{"stream", "conn_group", "event_type"})
counters["kinesis_stream_throughput_exceeded_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kinesis_stream_throughput_exceeded_total",
Help: "Number of messages that failed to deliver because the operation exceeded shard limit or if the message size was too big"}, []string{"stream", "conn_group", "event_type"})
counters["kinesis_streams_limit_exceeded_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kinesis_streams_limit_exceeded_total",
Help: "Number of messages that failed to deliver because target stream creation failed due to too many stream creation requests"}, []string{"stream", "conn_group", "event_type"})
counters["pubsub_topic_throughput_exceeded_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pubsub_topic_throughput_exceeded_total",
Help: "Number of messages that failed to deliver because pub/sub throughput limits were exceeded"}, []string{"topic", "conn_group", "event_type"})
counters["pubsub_topics_limit_exceeded_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pubsub_topics_limit_exceeded_total",
Help: "Number of messages that failed to deliver because pub/sub topic limits were exceeded"}, []string{"topic", "conn_group", "event_type"})
counters["events_rx_bytes_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "events_rx_bytes_total",
Help: "Total byte receieved in requests"}, []string{"conn_group", "event_type"})
Expand Down
2 changes: 1 addition & 1 deletion metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (m *mockObserver) Observe(f float64) {

func (promSuite *PrometheusTestSuite) Test_Prometheus_Collector_Metrics_Initialised() {
// NOTE(turtledev): what are we even testing here?
numCounters := 18
numCounters := 22
numGauge := 15
numHistogram := 10
var err error
Expand Down
61 changes: 41 additions & 20 deletions publisher/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type Kafka struct {
}

// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
// DeliveryChannel needs to be exclusive. DeliveryChannel is exposed for recyclability purpose.
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string) error {
errors := make([]error, len(events))
totalProcessed := 0
Expand All @@ -64,38 +63,60 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string) error {

err := pr.kp.Produce(message, deliveryChannel)
if err != nil {
metrics.Increment("kafka_messages_delivered_total", map[string]string{"success": "false", "conn_group": connGroup, "event_type": event.Type})
metrics.Increment(
"kafka_messages_undelivered_total",
map[string]string{
"topic": topic,
"conn_group": connGroup,
"event_type": event.Type,
},
)
if err.Error() == "Local: Unknown topic" {
errors[order] = fmt.Errorf("%v %s", err, topic)
metrics.Increment("kafka_unknown_topic_failure_total", map[string]string{"topic": topic, "event_type": event.Type, "conn_group": connGroup})
metrics.Increment(
"kafka_unknown_topic_failure_total",
map[string]string{
"topic": topic,
"event_type": event.Type,
"conn_group": connGroup,
},
)
} else {
errors[order] = err
}
continue
}
metrics.Increment(
"kafka_messages_delivered_total",
map[string]string{
"success": "true",
"conn_group": connGroup,
"event_type": event.Type,
},
)

totalProcessed++
}

// Wait for deliveryChannel as many as processed
for range totalProcessed {
d := <-deliveryChannel
m := d.(*kafka.Message)
if m.TopicPartition.Error != nil {
order := m.Opaque.(int)
eventType := events[order].Type
metrics.Increment("kafka_messages_undelivered_total", map[string]string{"success": "true", "conn_group": connGroup, "event_type": eventType})
metrics.Increment("kafka_messages_delivered_total", map[string]string{"success": "false", "conn_group": connGroup, "event_type": eventType})
errors[order] = m.TopicPartition.Error
var (
deliveryReport = <-deliveryChannel
msg = deliveryReport.(*kafka.Message)
order = msg.Opaque.(int)
eventType = events[order].Type
)
if msg.TopicPartition.Error != nil {
metrics.Increment(
"kafka_messages_undelivered_total",
map[string]string{
"topic": *msg.TopicPartition.Topic,
"conn_group": connGroup,
"event_type": eventType,
},
)
errors[order] = msg.TopicPartition.Error
continue
}
metrics.Increment(
"kafka_messages_delivered_total",
map[string]string{
"topic": *msg.TopicPartition.Topic,
"conn_group": connGroup,
"event_type": eventType,
},
)
}

if cmp.Or(errors...) != nil {
Expand Down
20 changes: 10 additions & 10 deletions publisher/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
Offset: 0,
Error: nil,
},
Opaque: 0,
}
}()
})
Expand All @@ -65,16 +66,15 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client := &mockClient{}
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
client.On("Produce", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
go func() {
args.Get(1).(chan kafka.Event) <- &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: args.Get(0).(*kafka.Message).TopicPartition.Topic,
Partition: 0,
Offset: 0,
Error: nil,
},
}
}()
args.Get(1).(chan kafka.Event) <- &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: args.Get(0).(*kafka.Message).TopicPartition.Topic,
Partition: 0,
Offset: 0,
Error: nil,
},
Opaque: 1,
}
}).Once()
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
kp := NewFromClient(client, 10, "%s", 1)
Expand Down
Loading

0 comments on commit d8a60ef

Please sign in to comment.