Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka reporter sendBatch #148

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 140 additions & 16 deletions reporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,44 @@ Package kafka implements a Kafka reporter to send spans to a Kafka server/cluste
package kafka

import (
"encoding/json"
"log"
"os"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
)

// defaultKafkaTopic sets the standard Kafka topic our Reporter will publish
// on. The default topic for zipkin-receiver-kafka is "zipkin", see:
// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka
const defaultKafkaTopic = "zipkin"
// on. The default topic for zipkin-collector/kafka is "zipkin", see:
// https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka

// defaults
const (
defaultBatchInterval = time.Second * 1 // BatchInterval in seconds
defaultBatchSize = 100
defaultMaxBacklog = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo are these good defaults?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc we are batching 100, and max size is 1000 bytes (?).

We have these defaults in java reporter: https://github.com/openzipkin/zipkin-reporter-java/blob/3f466e56012384ee685dd5cc91012129d312289b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L81-L90

max request size is 1MB (default in kafka producer) and but somewhere we've discuss to reduce it to have and avoid too big batches, so 500KB should be a good default.

Make sure to align producer properties to reporter ones, similar to https://github.com/openzipkin/zipkin-reporter-java/blob/3f466e56012384ee685dd5cc91012129d312289b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L131-L140

Copy link
Author

@sixiaobai sixiaobai Oct 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo similar to http reporter:
When batch size reaches BatchSize, a collect will be triggered.
When batch size reaches MaxBacklog, spans from the beginning of the batch will be disposed.
So, the real batch size is dynamic, between 100 and 1000 (Exclude clock-triggered).
Assuming that the average span size is 500 bytes, so max size will be 500KB and 1000 should be a good default. The average span size is the thing we need to focus on.

Max request size is also 1MB in sarama kafka producer: https://github.com/Shopify/sarama/blob/master/config.go#L411

defaultKafkaTopic = "zipkin"
)

// kafkaReporter implements Reporter by publishing spans to a Kafka
// broker.
type kafkaReporter struct {
producer sarama.AsyncProducer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
producer sarama.AsyncProducer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
batchInterval time.Duration
batchSize int
maxBacklog int
batchMtx *sync.Mutex
batch []*model.SpanModel
spanC chan *model.SpanModel
sendC chan struct{}
quit chan struct{}
shutdown chan error
}

// ReporterOption sets a parameter for the kafkaReporter
Expand All @@ -59,6 +76,24 @@ func Producer(p sarama.AsyncProducer) ReporterOption {
}
}

// BatchSize sets the maximum batch size, after which a collect will be
// triggered. The default batch size is 100 traces.
func BatchSize(n int) ReporterOption {
return func(r *kafkaReporter) { r.batchSize = n }
}

// BatchInterval sets the maximum duration we will buffer traces before
// emitting them to the collector. The default batch interval is 1 second.
func BatchInterval(d time.Duration) ReporterOption {
return func(r *kafkaReporter) { r.batchInterval = d }
}

// MaxBacklog sets the maximum backlog size. When batch size reaches this
// threshold, spans from the beginning of the batch will be disposed.
func MaxBacklog(n int) ReporterOption {
return func(r *kafkaReporter) { r.maxBacklog = n }
}

// Topic sets the kafka topic to attach the reporter producer on.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dup line

func Topic(t string) ReporterOption {
return func(c *kafkaReporter) {
Expand All @@ -80,9 +115,18 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption {
// TCP endpoints of the form "host:port".
func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) {
r := &kafkaReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
serializer: reporter.JSONSerializer{},
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
serializer: reporter.JSONSerializer{},
batchInterval: defaultBatchInterval,
batchSize: defaultBatchSize,
maxBacklog: defaultMaxBacklog,
batch: []*model.SpanModel{},
spanC: make(chan *model.SpanModel),
sendC: make(chan struct{}, 1),
quit: make(chan struct{}, 1),
shutdown: make(chan error, 1),
batchMtx: &sync.Mutex{},
}

for _, option := range options {
Expand All @@ -96,6 +140,8 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter
r.producer = p
}

go r.loop()
go r.sendLoop()
go r.logErrors()

return r, nil
Expand All @@ -108,21 +154,99 @@ func (r *kafkaReporter) logErrors() {
}

func (r *kafkaReporter) Send(s model.SpanModel) {
r.spanC <- &s
}

func (r *kafkaReporter) Close() error {
close(r.quit)
<-r.shutdown
return r.producer.Close()
}

func (r *kafkaReporter) loop() {
var (
nextSend = time.Now().Add(r.batchInterval)
ticker = time.NewTicker(r.batchInterval / 10)
tickerChan = ticker.C
)
defer ticker.Stop()

for {
select {
case span := <-r.spanC:
currentBatchSize := r.append(span)
if currentBatchSize >= r.batchSize {
nextSend = time.Now().Add(r.batchInterval)
r.enqueueSend()
}
case <-tickerChan:
if time.Now().After(nextSend) {
nextSend = time.Now().Add(r.batchInterval)
r.enqueueSend()
}
case <-r.quit:
close(r.sendC)
return
}
}
}

func (r *kafkaReporter) sendLoop() {
for range r.sendC {
_ = r.sendBatch()
}
r.shutdown <- r.sendBatch()
}

func (r *kafkaReporter) enqueueSend() {
select {
case r.sendC <- struct{}{}:
default:
// Do nothing if there's a pending send request already
}
}

func (r *kafkaReporter) sendBatch() error {
// Zipkin expects the message to be wrapped in an array
ss := []model.SpanModel{s}
m, err := json.Marshal(ss)

// Select all current spans in the batch to be sent
r.batchMtx.Lock()
sendBatch := r.batch[:]
r.batchMtx.Unlock()

if len(sendBatch) == 0 {
return nil
}
m, err := r.serializer.Serialize(sendBatch)
if err != nil {
r.logger.Printf("failed when marshalling the span: %s\n", err.Error())
return
return err
}

r.producer.Input() <- &sarama.ProducerMessage{
Topic: r.topic,
Key: nil,
Value: sarama.ByteEncoder(m),
}

// Remove sent spans from the batch even if they were not saved
r.batchMtx.Lock()
r.batch = r.batch[len(sendBatch):]
r.batchMtx.Unlock()
return nil
}

func (r *kafkaReporter) Close() error {
return r.producer.Close()
func (r *kafkaReporter) append(span *model.SpanModel) (newBatchSize int) {
r.batchMtx.Lock()

r.batch = append(r.batch, span)
if len(r.batch) > r.maxBacklog {
dispose := len(r.batch) - r.maxBacklog
r.logger.Printf("backlog too long, disposing %d spans", dispose)
r.batch = r.batch[dispose:]
}
newBatchSize = len(r.batch)

r.batchMtx.Unlock()
return
}
3 changes: 3 additions & 0 deletions reporter/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestKafkaProduce(t *testing.T) {
c, err := kafka.NewReporter(
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.BatchInterval(time.Millisecond*50),
)
if err != nil {
t.Fatal(err)
Expand All @@ -87,6 +88,7 @@ func TestKafkaProduceProto(t *testing.T) {
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.Serializer(zipkin_proto3.SpanSerializer{}),
kafka.BatchInterval(time.Millisecond*50),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -147,6 +149,7 @@ func TestKafkaErrors(t *testing.T) {
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.Logger(log.New(&chanWriter{errs}, "", log.LstdFlags)),
kafka.BatchInterval(time.Millisecond*50),
)
if err != nil {
t.Fatal(err)
Expand Down