-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka reporter sendBatch #148
Open
sixiaobai
wants to merge
4
commits into
openzipkin:master
Choose a base branch
from
sixiaobai:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,27 +18,44 @@ Package kafka implements a Kafka reporter to send spans to a Kafka server/cluste | |
package kafka | ||
|
||
import ( | ||
"encoding/json" | ||
"log" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/Shopify/sarama" | ||
"github.com/openzipkin/zipkin-go/model" | ||
"github.com/openzipkin/zipkin-go/reporter" | ||
) | ||
|
||
// defaultKafkaTopic sets the standard Kafka topic our Reporter will publish | ||
// on. The default topic for zipkin-receiver-kafka is "zipkin", see: | ||
// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka | ||
const defaultKafkaTopic = "zipkin" | ||
// on. The default topic for zipkin-collector/kafka is "zipkin", see: | ||
// https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka | ||
|
||
// defaults | ||
const ( | ||
defaultBatchInterval = time.Second * 1 // BatchInterval in seconds | ||
defaultBatchSize = 100 | ||
defaultMaxBacklog = 1000 | ||
defaultKafkaTopic = "zipkin" | ||
) | ||
|
||
// kafkaReporter implements Reporter by publishing spans to a Kafka | ||
// broker. | ||
type kafkaReporter struct { | ||
producer sarama.AsyncProducer | ||
logger *log.Logger | ||
topic string | ||
serializer reporter.SpanSerializer | ||
producer sarama.AsyncProducer | ||
logger *log.Logger | ||
topic string | ||
serializer reporter.SpanSerializer | ||
batchInterval time.Duration | ||
batchSize int | ||
maxBacklog int | ||
batchMtx *sync.Mutex | ||
batch []*model.SpanModel | ||
spanC chan *model.SpanModel | ||
sendC chan struct{} | ||
quit chan struct{} | ||
shutdown chan error | ||
} | ||
|
||
// ReporterOption sets a parameter for the kafkaReporter | ||
|
@@ -59,6 +76,24 @@ func Producer(p sarama.AsyncProducer) ReporterOption { | |
} | ||
} | ||
|
||
// BatchSize sets the maximum batch size, after which a collect will be | ||
// triggered. The default batch size is 100 traces. | ||
func BatchSize(n int) ReporterOption { | ||
return func(r *kafkaReporter) { r.batchSize = n } | ||
} | ||
|
||
// BatchInterval sets the maximum duration we will buffer traces before | ||
// emitting them to the collector. The default batch interval is 1 second. | ||
func BatchInterval(d time.Duration) ReporterOption { | ||
return func(r *kafkaReporter) { r.batchInterval = d } | ||
} | ||
|
||
// MaxBacklog sets the maximum backlog size. When batch size reaches this | ||
// threshold, spans from the beginning of the batch will be disposed. | ||
func MaxBacklog(n int) ReporterOption { | ||
return func(r *kafkaReporter) { r.maxBacklog = n } | ||
} | ||
|
||
// Topic sets the kafka topic to attach the reporter producer on. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: dup line |
||
func Topic(t string) ReporterOption { | ||
return func(c *kafkaReporter) { | ||
|
@@ -80,9 +115,18 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption { | |
// TCP endpoints of the form "host:port". | ||
func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) { | ||
r := &kafkaReporter{ | ||
logger: log.New(os.Stderr, "", log.LstdFlags), | ||
topic: defaultKafkaTopic, | ||
serializer: reporter.JSONSerializer{}, | ||
logger: log.New(os.Stderr, "", log.LstdFlags), | ||
topic: defaultKafkaTopic, | ||
serializer: reporter.JSONSerializer{}, | ||
batchInterval: defaultBatchInterval, | ||
batchSize: defaultBatchSize, | ||
maxBacklog: defaultMaxBacklog, | ||
batch: []*model.SpanModel{}, | ||
spanC: make(chan *model.SpanModel), | ||
sendC: make(chan struct{}, 1), | ||
quit: make(chan struct{}, 1), | ||
shutdown: make(chan error, 1), | ||
batchMtx: &sync.Mutex{}, | ||
} | ||
|
||
for _, option := range options { | ||
|
@@ -96,6 +140,8 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter | |
r.producer = p | ||
} | ||
|
||
go r.loop() | ||
go r.sendLoop() | ||
go r.logErrors() | ||
|
||
return r, nil | ||
|
@@ -108,21 +154,99 @@ func (r *kafkaReporter) logErrors() { | |
} | ||
|
||
func (r *kafkaReporter) Send(s model.SpanModel) { | ||
r.spanC <- &s | ||
} | ||
|
||
func (r *kafkaReporter) Close() error { | ||
close(r.quit) | ||
<-r.shutdown | ||
return r.producer.Close() | ||
} | ||
|
||
func (r *kafkaReporter) loop() { | ||
var ( | ||
nextSend = time.Now().Add(r.batchInterval) | ||
ticker = time.NewTicker(r.batchInterval / 10) | ||
tickerChan = ticker.C | ||
) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case span := <-r.spanC: | ||
currentBatchSize := r.append(span) | ||
if currentBatchSize >= r.batchSize { | ||
nextSend = time.Now().Add(r.batchInterval) | ||
r.enqueueSend() | ||
} | ||
case <-tickerChan: | ||
if time.Now().After(nextSend) { | ||
nextSend = time.Now().Add(r.batchInterval) | ||
r.enqueueSend() | ||
} | ||
case <-r.quit: | ||
close(r.sendC) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (r *kafkaReporter) sendLoop() { | ||
for range r.sendC { | ||
_ = r.sendBatch() | ||
} | ||
r.shutdown <- r.sendBatch() | ||
} | ||
|
||
func (r *kafkaReporter) enqueueSend() { | ||
select { | ||
case r.sendC <- struct{}{}: | ||
default: | ||
// Do nothing if there's a pending send request already | ||
} | ||
} | ||
|
||
func (r *kafkaReporter) sendBatch() error { | ||
// Zipkin expects the message to be wrapped in an array | ||
ss := []model.SpanModel{s} | ||
m, err := json.Marshal(ss) | ||
|
||
// Select all current spans in the batch to be sent | ||
r.batchMtx.Lock() | ||
sendBatch := r.batch[:] | ||
r.batchMtx.Unlock() | ||
|
||
if len(sendBatch) == 0 { | ||
return nil | ||
} | ||
m, err := r.serializer.Serialize(sendBatch) | ||
if err != nil { | ||
r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) | ||
return | ||
return err | ||
} | ||
|
||
r.producer.Input() <- &sarama.ProducerMessage{ | ||
Topic: r.topic, | ||
Key: nil, | ||
Value: sarama.ByteEncoder(m), | ||
} | ||
|
||
// Remove sent spans from the batch even if they were not saved | ||
r.batchMtx.Lock() | ||
r.batch = r.batch[len(sendBatch):] | ||
r.batchMtx.Unlock() | ||
return nil | ||
} | ||
|
||
func (r *kafkaReporter) Close() error { | ||
return r.producer.Close() | ||
func (r *kafkaReporter) append(span *model.SpanModel) (newBatchSize int) { | ||
r.batchMtx.Lock() | ||
|
||
r.batch = append(r.batch, span) | ||
if len(r.batch) > r.maxBacklog { | ||
dispose := len(r.batch) - r.maxBacklog | ||
r.logger.Printf("backlog too long, disposing %d spans", dispose) | ||
r.batch = r.batch[dispose:] | ||
} | ||
newBatchSize = len(r.batch) | ||
|
||
r.batchMtx.Unlock() | ||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeqo are these good defaults?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iiuc we are batching 100, and max size is 1000 bytes (?).
We have these defaults in java reporter: https://github.com/openzipkin/zipkin-reporter-java/blob/3f466e56012384ee685dd5cc91012129d312289b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L81-L90
max request size is 1MB (default in kafka producer) and but somewhere we've discuss to reduce it to have and avoid too big batches, so 500KB should be a good default.
Make sure to align producer properties to reporter ones, similar to https://github.com/openzipkin/zipkin-reporter-java/blob/3f466e56012384ee685dd5cc91012129d312289b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L131-L140
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeqo similar to http reporter:
When batch size reaches BatchSize, a collect will be triggered.
When batch size reaches MaxBacklog, spans from the beginning of the batch will be disposed.
So, the real batch size is dynamic, between 100 and 1000 (Exclude clock-triggered).
Assuming that the average span size is 500 bytes, so max size will be 500KB and 1000 should be a good default. The average span size is the thing we need to focus on.
Max request size is also 1MB in sarama kafka producer: https://github.com/Shopify/sarama/blob/master/config.go#L411