diff --git a/reporter/kafka/kafka.go b/reporter/kafka/kafka.go index a50986f..c1a4b32 100644 --- a/reporter/kafka/kafka.go +++ b/reporter/kafka/kafka.go @@ -18,9 +18,10 @@ Package kafka implements a Kafka reporter to send spans to a Kafka server/cluste package kafka import ( - "encoding/json" "log" "os" + "sync" + "time" "github.com/Shopify/sarama" "github.com/openzipkin/zipkin-go/model" @@ -28,17 +29,33 @@ import ( ) // defaultKafkaTopic sets the standard Kafka topic our Reporter will publish -// on. The default topic for zipkin-receiver-kafka is "zipkin", see: -// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka -const defaultKafkaTopic = "zipkin" +// on. The default topic for zipkin-collector/kafka is "zipkin", see: +// https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka + +// defaults +const ( + defaultBatchInterval = time.Second * 1 // BatchInterval in seconds + defaultBatchSize = 100 + defaultMaxBacklog = 1000 + defaultKafkaTopic = "zipkin" +) // kafkaReporter implements Reporter by publishing spans to a Kafka // broker. type kafkaReporter struct { - producer sarama.AsyncProducer - logger *log.Logger - topic string - serializer reporter.SpanSerializer + producer sarama.AsyncProducer + logger *log.Logger + topic string + serializer reporter.SpanSerializer + batchInterval time.Duration + batchSize int + maxBacklog int + batchMtx *sync.Mutex + batch []*model.SpanModel + spanC chan *model.SpanModel + sendC chan struct{} + quit chan struct{} + shutdown chan error } // ReporterOption sets a parameter for the kafkaReporter @@ -59,6 +76,24 @@ func Producer(p sarama.AsyncProducer) ReporterOption { } } +// BatchSize sets the maximum batch size, after which a collect will be +// triggered. The default batch size is 100 traces. +func BatchSize(n int) ReporterOption { + return func(r *kafkaReporter) { r.batchSize = n } +} + +// BatchInterval sets the maximum duration we will buffer traces before +// emitting them to the collector. The default batch interval is 1 second. +func BatchInterval(d time.Duration) ReporterOption { + return func(r *kafkaReporter) { r.batchInterval = d } +} + +// MaxBacklog sets the maximum backlog size. When batch size reaches this +// threshold, spans from the beginning of the batch will be disposed. +func MaxBacklog(n int) ReporterOption { + return func(r *kafkaReporter) { r.maxBacklog = n } +} + // Topic sets the kafka topic to attach the reporter producer on. func Topic(t string) ReporterOption { return func(c *kafkaReporter) { @@ -80,9 +115,18 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption { // TCP endpoints of the form "host:port". func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) { r := &kafkaReporter{ - logger: log.New(os.Stderr, "", log.LstdFlags), - topic: defaultKafkaTopic, - serializer: reporter.JSONSerializer{}, + logger: log.New(os.Stderr, "", log.LstdFlags), + topic: defaultKafkaTopic, + serializer: reporter.JSONSerializer{}, + batchInterval: defaultBatchInterval, + batchSize: defaultBatchSize, + maxBacklog: defaultMaxBacklog, + batch: []*model.SpanModel{}, + spanC: make(chan *model.SpanModel), + sendC: make(chan struct{}, 1), + quit: make(chan struct{}, 1), + shutdown: make(chan error, 1), + batchMtx: &sync.Mutex{}, } for _, option := range options { @@ -96,6 +140,8 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter r.producer = p } + go r.loop() + go r.sendLoop() go r.logErrors() return r, nil @@ -108,12 +154,73 @@ func (r *kafkaReporter) logErrors() { } func (r *kafkaReporter) Send(s model.SpanModel) { + r.spanC <- &s +} + +func (r *kafkaReporter) Close() error { + close(r.quit) + <-r.shutdown + return r.producer.Close() +} + +func (r *kafkaReporter) loop() { + var ( + nextSend = time.Now().Add(r.batchInterval) + ticker = time.NewTicker(r.batchInterval / 10) + tickerChan = ticker.C + ) + defer ticker.Stop() + + for { + select { + case span := <-r.spanC: + currentBatchSize := r.append(span) + if currentBatchSize >= r.batchSize { + nextSend = time.Now().Add(r.batchInterval) + r.enqueueSend() + } + case <-tickerChan: + if time.Now().After(nextSend) { + nextSend = time.Now().Add(r.batchInterval) + r.enqueueSend() + } + case <-r.quit: + close(r.sendC) + return + } + } +} + +func (r *kafkaReporter) sendLoop() { + for range r.sendC { + _ = r.sendBatch() + } + r.shutdown <- r.sendBatch() +} + +func (r *kafkaReporter) enqueueSend() { + select { + case r.sendC <- struct{}{}: + default: + // Do nothing if there's a pending send request already + } +} + +func (r *kafkaReporter) sendBatch() error { // Zipkin expects the message to be wrapped in an array - ss := []model.SpanModel{s} - m, err := json.Marshal(ss) + + // Select all current spans in the batch to be sent + r.batchMtx.Lock() + sendBatch := r.batch[:] + r.batchMtx.Unlock() + + if len(sendBatch) == 0 { + return nil + } + m, err := r.serializer.Serialize(sendBatch) if err != nil { r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) - return + return err } r.producer.Input() <- &sarama.ProducerMessage{ @@ -121,8 +228,25 @@ func (r *kafkaReporter) Send(s model.SpanModel) { Key: nil, Value: sarama.ByteEncoder(m), } + + // Remove sent spans from the batch even if they were not saved + r.batchMtx.Lock() + r.batch = r.batch[len(sendBatch):] + r.batchMtx.Unlock() + return nil } -func (r *kafkaReporter) Close() error { - return r.producer.Close() +func (r *kafkaReporter) append(span *model.SpanModel) (newBatchSize int) { + r.batchMtx.Lock() + + r.batch = append(r.batch, span) + if len(r.batch) > r.maxBacklog { + dispose := len(r.batch) - r.maxBacklog + r.logger.Printf("backlog too long, disposing %d spans", dispose) + r.batch = r.batch[dispose:] + } + newBatchSize = len(r.batch) + + r.batchMtx.Unlock() + return } diff --git a/reporter/kafka/kafka_test.go b/reporter/kafka/kafka_test.go index fcc5e21..9dc1aaa 100644 --- a/reporter/kafka/kafka_test.go +++ b/reporter/kafka/kafka_test.go @@ -68,6 +68,7 @@ func TestKafkaProduce(t *testing.T) { c, err := kafka.NewReporter( []string{"192.0.2.10:9092"}, kafka.Producer(p), + kafka.BatchInterval(time.Millisecond*50), ) if err != nil { t.Fatal(err) @@ -87,6 +88,7 @@ func TestKafkaProduceProto(t *testing.T) { []string{"192.0.2.10:9092"}, kafka.Producer(p), kafka.Serializer(zipkin_proto3.SpanSerializer{}), + kafka.BatchInterval(time.Millisecond*50), ) if err != nil { t.Fatal(err) @@ -147,6 +149,7 @@ func TestKafkaErrors(t *testing.T) { []string{"192.0.2.10:9092"}, kafka.Producer(p), kafka.Logger(log.New(&chanWriter{errs}, "", log.LstdFlags)), + kafka.BatchInterval(time.Millisecond*50), ) if err != nil { t.Fatal(err)