Skip to content

Commit

Permalink
fix kafka issue consumer group IBM/sarama#1192
Browse files Browse the repository at this point in the history
  • Loading branch information
TeNg committed Dec 26, 2022
1 parent 20f5a6a commit 393632a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 31 deletions.
74 changes: 43 additions & 31 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (ps *Client) onAsyncSubscribe(topics []*Topic, numberPuller int, buf chan M

for i := 0; i < numberPuller; i++ {
go func() {
defer consumer.wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -238,6 +239,11 @@ func (ps *Client) onAsyncSubscribe(topics []*Topic, numberPuller int, buf chan M
consumer.lock <- true
break
}
log.Print("ssssss")
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.wg = &sync.WaitGroup{}
consumer.wg.Add(numberPuller)
}
Expand Down Expand Up @@ -298,41 +304,47 @@ func (consumer *ConsumerGroupHandle) ConsumeClaim(session sarama.ConsumerGroupSe
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for m := range claim.Messages() {
// log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partion = %v", string(message.Value), message.Timestamp, message.Topic, message.Partition)
// messageHandler(message, consumer.bufMessage, session)
if len(m.Value) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
return errors.New("message error")
}
msg := Message{
Topic: m.Topic,
Body: m.Value,
Offset: m.Offset,
Partition: int(m.Partition),
Timestamp: m.Timestamp.Unix(),
}
if len(m.Headers) != 0 {
headers := map[string]string{}
for _, header := range m.Headers {
headers[string(header.Key)] = string(header.Value)
for {
select {
case m := <-claim.Messages():
if len(m.Value) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
return errors.New("message error")
}
msg := Message{
Topic: m.Topic,
Body: m.Value,
Offset: m.Offset,
Partition: int(m.Partition),
Timestamp: m.Timestamp.Unix(),
}
if len(m.Headers) != 0 {
headers := map[string]string{}
for _, header := range m.Headers {
headers[string(header.Key)] = string(header.Value)
}
msg.Headers = headers
}
if consumer.autoCommit[m.Topic] {
session.MarkOffset(m.Topic, m.Partition, m.Offset, "")
session.MarkMessage(m, "")
consumer.bufMessage <- msg
msg.Commit = func() {}
continue
}
msg.Commit = func() {
session.MarkOffset(m.Topic, m.Partition, m.Offset, "")
session.MarkMessage(m, "")
}
msg.Headers = headers
}
if consumer.autoCommit[m.Topic] {
session.MarkOffset(m.Topic, m.Partition, m.Offset, "")
session.MarkMessage(m, "")
consumer.bufMessage <- msg
msg.Commit = func() {}
continue
}
msg.Commit = func() {
session.MarkOffset(m.Topic, m.Partition, m.Offset, "")
session.MarkMessage(m, "")

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
consumer.bufMessage <- msg
}
return nil
}

func (ps *Client) Close() error {
Expand Down
15 changes: 15 additions & 0 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,18 @@ func BenchmarkPublishMessages10000z(b *testing.B) {
func BenchmarkSubscribeSimpleManualCommit(b *testing.B) {
testSubscribeSimpleManualCommit("0.0.0.0:9092", "CG-1", "topic-5")
}

func TestSimplePublishAndSubscibeResub(t *testing.T) {
lock := make(chan bool)
go t.Run("subscribe 1", func(t *testing.T) {
testSubscribeSimpleManualCommit("0.0.0.0:9092", "CG-0", "topic-3")
})
go t.Run("subscribe 2", func(t *testing.T) {
testSubscribeSimpleManualCommit("0.0.0.0:9092", "CG-1", "topic-3")
})
time.Sleep(5 * time.Second)
t.Run("publish to topic", func(t *testing.T) {
testPublishMessages("0.0.0.0:9092", "topic-3", 20)
})
<-lock
}

0 comments on commit 393632a

Please sign in to comment.