From cfda6e3209d94987228b9d6ba5363ccf6cb3e51e Mon Sep 17 00:00:00 2001 From: wanglongfei Date: Wed, 25 Sep 2019 19:12:12 +0800 Subject: [PATCH 1/4] kafka reporter sendBatch Signed-off-by: wanglongfei --- reporter/kafka/kafka.go | 143 +++++++++++++++++++++++++++++++---- reporter/kafka/kafka_test.go | 3 + 2 files changed, 133 insertions(+), 13 deletions(-) diff --git a/reporter/kafka/kafka.go b/reporter/kafka/kafka.go index a50986f..b54a7f7 100644 --- a/reporter/kafka/kafka.go +++ b/reporter/kafka/kafka.go @@ -21,6 +21,8 @@ import ( "encoding/json" "log" "os" + "sync" + "time" "github.com/Shopify/sarama" "github.com/openzipkin/zipkin-go/model" @@ -30,15 +32,31 @@ import ( // defaultKafkaTopic sets the standard Kafka topic our Reporter will publish // on. The default topic for zipkin-receiver-kafka is "zipkin", see: // https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka -const defaultKafkaTopic = "zipkin" + +// defaults +const ( + defaultBatchInterval = time.Second * 1 // BatchInterval in seconds + defaultBatchSize = 100 + defaultMaxBacklog = 1000 + defaultKafkaTopic = "zipkin" +) // kafkaReporter implements Reporter by publishing spans to a Kafka // broker. type kafkaReporter struct { - producer sarama.AsyncProducer - logger *log.Logger - topic string - serializer reporter.SpanSerializer + producer sarama.AsyncProducer + logger *log.Logger + topic string + serializer reporter.SpanSerializer + batchInterval time.Duration + batchSize int + maxBacklog int + sendMtx *sync.Mutex + batchMtx *sync.Mutex + batch []*model.SpanModel + spanC chan *model.SpanModel + quit chan struct{} + shutdown chan error } // ReporterOption sets a parameter for the kafkaReporter @@ -59,6 +77,25 @@ func Producer(p sarama.AsyncProducer) ReporterOption { } } +// BatchSize sets the maximum batch size, after which a collect will be +// triggered. The default batch size is 100 traces. +func BatchSize(n int) ReporterOption { + return func(r *kafkaReporter) { r.batchSize = n } +} + +// BatchInterval sets the maximum duration we will buffer traces before +// emitting them to the collector. The default batch interval is 1 second. +func BatchInterval(d time.Duration) ReporterOption { + return func(r *kafkaReporter) { r.batchInterval = d } +} + +// MaxBacklog sets the maximum backlog size. When batch size reaches this +// threshold, spans from the beginning of the batch will be disposed. +func MaxBacklog(n int) ReporterOption { + return func(r *kafkaReporter) { r.maxBacklog = n } +} + +// Topic sets the kafka topic to attach the reporter producer on. // Topic sets the kafka topic to attach the reporter producer on. func Topic(t string) ReporterOption { return func(c *kafkaReporter) { @@ -80,9 +117,18 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption { // TCP endpoints of the form "host:port". func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) { r := &kafkaReporter{ - logger: log.New(os.Stderr, "", log.LstdFlags), - topic: defaultKafkaTopic, - serializer: reporter.JSONSerializer{}, + logger: log.New(os.Stderr, "", log.LstdFlags), + topic: defaultKafkaTopic, + serializer: reporter.JSONSerializer{}, + batchInterval: defaultBatchInterval, + batchSize: defaultBatchSize, + maxBacklog: defaultMaxBacklog, + batch: []*model.SpanModel{}, + spanC: make(chan *model.SpanModel), + quit: make(chan struct{}, 1), + shutdown: make(chan error, 1), + sendMtx: &sync.Mutex{}, + batchMtx: &sync.Mutex{}, } for _, option := range options { @@ -96,6 +142,7 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter r.producer = p } + go r.loop() go r.logErrors() return r, nil @@ -108,12 +155,65 @@ func (r *kafkaReporter) logErrors() { } func (r *kafkaReporter) Send(s model.SpanModel) { + r.spanC <- &s +} + +func (r *kafkaReporter) Close() error { + close(r.quit) + <-r.shutdown + return r.producer.Close() +} + +func (r *kafkaReporter) loop() { + var ( + nextSend = time.Now().Add(r.batchInterval) + ticker = time.NewTicker(r.batchInterval / 10) + tickerChan = ticker.C + ) + defer ticker.Stop() + + for { + select { + case span := <-r.spanC: + currentBatchSize := r.append(span) + if currentBatchSize >= r.batchSize { + nextSend = time.Now().Add(r.batchInterval) + go func() { + _ = r.sendBatch() + }() + } + case <-tickerChan: + if time.Now().After(nextSend) { + nextSend = time.Now().Add(r.batchInterval) + go func() { + _ = r.sendBatch() + }() + } + case <-r.quit: + r.shutdown <- r.sendBatch() + return + } + } +} + +func (r *kafkaReporter) sendBatch() error { // Zipkin expects the message to be wrapped in an array - ss := []model.SpanModel{s} - m, err := json.Marshal(ss) + // in order to prevent sending the same batch twice + r.sendMtx.Lock() + defer r.sendMtx.Unlock() + + // Select all current spans in the batch to be sent + r.batchMtx.Lock() + sendBatch := r.batch[:] + r.batchMtx.Unlock() + + if len(sendBatch) == 0 { + return nil + } + m, err := json.Marshal(sendBatch) if err != nil { r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) - return + return err } r.producer.Input() <- &sarama.ProducerMessage{ @@ -121,8 +221,25 @@ func (r *kafkaReporter) Send(s model.SpanModel) { Key: nil, Value: sarama.ByteEncoder(m), } + + // Remove sent spans from the batch even if they were not saved + r.batchMtx.Lock() + r.batch = r.batch[len(sendBatch):] + r.batchMtx.Unlock() + return nil } -func (r *kafkaReporter) Close() error { - return r.producer.Close() +func (r *kafkaReporter) append(span *model.SpanModel) (newBatchSize int) { + r.batchMtx.Lock() + + r.batch = append(r.batch, span) + if len(r.batch) > r.maxBacklog { + dispose := len(r.batch) - r.maxBacklog + r.logger.Printf("backlog too long, disposing %d spans", dispose) + r.batch = r.batch[dispose:] + } + newBatchSize = len(r.batch) + + r.batchMtx.Unlock() + return } diff --git a/reporter/kafka/kafka_test.go b/reporter/kafka/kafka_test.go index fcc5e21..9dc1aaa 100644 --- a/reporter/kafka/kafka_test.go +++ b/reporter/kafka/kafka_test.go @@ -68,6 +68,7 @@ func TestKafkaProduce(t *testing.T) { c, err := kafka.NewReporter( []string{"192.0.2.10:9092"}, kafka.Producer(p), + kafka.BatchInterval(time.Millisecond*50), ) if err != nil { t.Fatal(err) @@ -87,6 +88,7 @@ func TestKafkaProduceProto(t *testing.T) { []string{"192.0.2.10:9092"}, kafka.Producer(p), kafka.Serializer(zipkin_proto3.SpanSerializer{}), + kafka.BatchInterval(time.Millisecond*50), ) if err != nil { t.Fatal(err) @@ -147,6 +149,7 @@ func TestKafkaErrors(t *testing.T) { []string{"192.0.2.10:9092"}, kafka.Producer(p), kafka.Logger(log.New(&chanWriter{errs}, "", log.LstdFlags)), + kafka.BatchInterval(time.Millisecond*50), ) if err != nil { t.Fatal(err) From 4e975733e8f9a890614c012b8a08f5da9910188d Mon Sep 17 00:00:00 2001 From: wanglongfei Date: Thu, 26 Sep 2019 15:08:29 +0800 Subject: [PATCH 2/4] fix potential unbounded goroutine creation Signed-off-by: wanglongfei --- reporter/kafka/kafka.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/reporter/kafka/kafka.go b/reporter/kafka/kafka.go index b54a7f7..c50659c 100644 --- a/reporter/kafka/kafka.go +++ b/reporter/kafka/kafka.go @@ -51,10 +51,10 @@ type kafkaReporter struct { batchInterval time.Duration batchSize int maxBacklog int - sendMtx *sync.Mutex batchMtx *sync.Mutex batch []*model.SpanModel spanC chan *model.SpanModel + sendC chan struct{} quit chan struct{} shutdown chan error } @@ -125,9 +125,9 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter maxBacklog: defaultMaxBacklog, batch: []*model.SpanModel{}, spanC: make(chan *model.SpanModel), + sendC: make(chan struct{}, 1), quit: make(chan struct{}, 1), shutdown: make(chan error, 1), - sendMtx: &sync.Mutex{}, batchMtx: &sync.Mutex{}, } @@ -143,6 +143,7 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter } go r.loop() + go r.sendLoop() go r.logErrors() return r, nil @@ -178,29 +179,37 @@ func (r *kafkaReporter) loop() { currentBatchSize := r.append(span) if currentBatchSize >= r.batchSize { nextSend = time.Now().Add(r.batchInterval) - go func() { - _ = r.sendBatch() - }() + r.enqueueSend() } case <-tickerChan: if time.Now().After(nextSend) { nextSend = time.Now().Add(r.batchInterval) - go func() { - _ = r.sendBatch() - }() + r.enqueueSend() } case <-r.quit: - r.shutdown <- r.sendBatch() + close(r.sendC) return } } } +func (r *kafkaReporter) sendLoop() { + for range r.sendC { + _ = r.sendBatch() + } + r.shutdown <- r.sendBatch() +} + +func (r *kafkaReporter) enqueueSend() { + select { + case r.sendC <- struct{}{}: + default: + // Do nothing if there's a pending send request already + } +} + func (r *kafkaReporter) sendBatch() error { // Zipkin expects the message to be wrapped in an array - // in order to prevent sending the same batch twice - r.sendMtx.Lock() - defer r.sendMtx.Unlock() // Select all current spans in the batch to be sent r.batchMtx.Lock() From 9907f9b6c866b3f93d8b6ff9a662cc47290073d6 Mon Sep 17 00:00:00 2001 From: wanglongfei Date: Tue, 8 Oct 2019 18:07:40 +0800 Subject: [PATCH 3/4] rm dup line&update zipkin-collector url Signed-off-by: wanglongfei --- reporter/kafka/kafka.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/reporter/kafka/kafka.go b/reporter/kafka/kafka.go index c50659c..f3bd58a 100644 --- a/reporter/kafka/kafka.go +++ b/reporter/kafka/kafka.go @@ -30,8 +30,8 @@ import ( ) // defaultKafkaTopic sets the standard Kafka topic our Reporter will publish -// on. The default topic for zipkin-receiver-kafka is "zipkin", see: -// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka +// on. The default topic for zipkin-collector/kafka is "zipkin", see: +// https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka // defaults const ( @@ -95,7 +95,6 @@ func MaxBacklog(n int) ReporterOption { return func(r *kafkaReporter) { r.maxBacklog = n } } -// Topic sets the kafka topic to attach the reporter producer on. // Topic sets the kafka topic to attach the reporter producer on. func Topic(t string) ReporterOption { return func(c *kafkaReporter) { From de72b839dcc566551141d9149b10ad5aa575e25a Mon Sep 17 00:00:00 2001 From: wanglongfei Date: Mon, 18 Nov 2019 11:05:51 +0800 Subject: [PATCH 4/4] fix kafka reporter serializer conflict Signed-off-by: wanglongfei --- reporter/kafka/kafka.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/reporter/kafka/kafka.go b/reporter/kafka/kafka.go index f3bd58a..c1a4b32 100644 --- a/reporter/kafka/kafka.go +++ b/reporter/kafka/kafka.go @@ -18,7 +18,6 @@ Package kafka implements a Kafka reporter to send spans to a Kafka server/cluste package kafka import ( - "encoding/json" "log" "os" "sync" @@ -218,7 +217,7 @@ func (r *kafkaReporter) sendBatch() error { if len(sendBatch) == 0 { return nil } - m, err := json.Marshal(sendBatch) + m, err := r.serializer.Serialize(sendBatch) if err != nil { r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) return err