Skip to content

Commit

Permalink
set and test: limit timeout to publish 15ms
Browse files Browse the repository at this point in the history
  • Loading branch information
TeNg committed Aug 10, 2023
1 parent 4b3bb15 commit 85a3803
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
6 changes: 5 additions & 1 deletion v2/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ func TestListenMessageManual2(t *testing.T) {
log.Print("Listen message 1")
kclient.Listen(context.Background(), cmsg)
log.Print("Listen message 2")
i := 0
for msg := range cmsg {
// b, _ := json.MarshalIndent(msg, "", " ")
log.Printf("[%s] - part: %d - %s", msg.Topic, msg.Partition, string(msg.Body))
log.Print(msg)
msg.Commit()
i++
log.Print(i)
}
time.Sleep(10 * time.Second)
log.Print(i)
// time.Sleep(10 * time.Second)
}
6 changes: 2 additions & 4 deletions v2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func (k *Client) NewPublisher() error {
}
w := &kafka.Writer{
Addr: kafka.TCP(k.addrs...),
Balancer: &kafka.LeastBytes{},
BatchTimeout: 10 * time.Millisecond,
Balancer: &kafka.RoundRobin{},
BatchTimeout: 15 * time.Millisecond,
}

// if w == nil {
Expand All @@ -45,13 +45,11 @@ func (k *Client) Publish(ctx context.Context, topic string, msg interface{}) err
if err != nil {
return errors.New("message of data sender can not marshal")
}
now := time.Now()
err = k.writer.WriteMessages(ctx, kafka.Message{
Topic: topic,
Key: []byte(hashMessage(dataSender)),
Value: dataSender,
})
log.Print("x1 ", time.Since(now))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion v2/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestSendMessage2(t *testing.T) {
// }
// }
// }()
for i := 0; i < 130; i++ {
for i := 0; i < 300; i++ {
err := kclient.Publish(context.TODO(), "topic-6x", map[string]interface{}{
"meta": "tester2",
"index": i,
Expand Down

0 comments on commit 85a3803

Please sign in to comment.