Skip to content

Commit

Permalink
reduce the confirmation timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Feb 23, 2024
1 parent 7d09302 commit 054bc5d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
14 changes: 8 additions & 6 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,17 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequ

func (producer *Producer) flushUnConfirmedMessages() {
producer.mutex.Lock()
if producer.publishConfirm != nil {
for _, msg := range producer.unConfirmedMessages {
msg.confirmed = false
msg.err = ConnectionClosed
msg.errorCode = connectionCloseError

for _, msg := range producer.unConfirmedMessages {
msg.confirmed = false
msg.err = ConnectionClosed
msg.errorCode = connectionCloseError
if producer.publishConfirm != nil {
producer.publishConfirm <- []*ConfirmationStatus{msg}
delete(producer.unConfirmedMessages, msg.publishingId)
}
delete(producer.unConfirmedMessages, msg.publishingId)
}

producer.mutex.Unlock()
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ var _ = Describe("Streaming Producers", func() {

BeforeEach(func() {
producer = createProducer(
NewProducerOptions().SetBatchSize(BatchSize).SetSubEntrySize(SubEntrySize).SetCompression(Compression{}.None()),
NewProducerOptions().
SetBatchSize(BatchSize).
SetSubEntrySize(SubEntrySize).
SetConfirmationTimeOut(1*time.Second).
SetCompression(Compression{}.None()),
&messagesReceived,
testEnvironment,
testProducerStream,
Expand Down Expand Up @@ -229,7 +233,7 @@ var _ = Describe("Streaming Producers", func() {
}, 5*time.Second).Should(Equal(int32(14)),
"confirm should receive same messages send by producer")

Expect(len(producer.unConfirmedMessages)).To(Equal(0))
Expect(producer.lenUnConfirmed()).To(Equal(0))
Expect(producer.Close()).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -708,31 +712,31 @@ var _ = Describe("Streaming Producers", func() {
SetSubEntrySize(33).SetCompression(Compression{}.Gzip()))
Expect(err).NotTo(HaveOccurred())
testCompress(producerGZIP)
Expect(len(producerGZIP.unConfirmedMessages)).To(Equal(0))
Expect(producerGZIP.lenUnConfirmed()).To(Equal(0))
Expect(producerGZIP.Close()).NotTo(HaveOccurred())

producerLz4, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(100).
SetSubEntrySize(55).SetCompression(Compression{}.Lz4()))
Expect(err).NotTo(HaveOccurred())
testCompress(producerLz4)
Expect(len(producerLz4.unConfirmedMessages)).To(Equal(0))
Expect(producerLz4.lenUnConfirmed()).To(Equal(0))
Expect(producerLz4.Close()).NotTo(HaveOccurred())

producerSnappy, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(50).
SetSubEntrySize(666).SetCompression(Compression{}.Snappy()))
Expect(err).NotTo(HaveOccurred())
testCompress(producerSnappy)
Expect(len(producerSnappy.unConfirmedMessages)).To(Equal(0))
Expect(producerSnappy.lenUnConfirmed()).To(Equal(0))
Expect(producerSnappy.Close()).NotTo(HaveOccurred())

producerZstd, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(200).
SetSubEntrySize(98).SetCompression(Compression{}.Zstd()))
Expect(err).NotTo(HaveOccurred())
testCompress(producerZstd)
Expect(len(producerZstd.unConfirmedMessages)).To(Equal(0))
Expect(producerZstd.lenUnConfirmed()).To(Equal(0))
Expect(producerZstd.Close()).NotTo(HaveOccurred())

})
Expand Down

0 comments on commit 054bc5d

Please sign in to comment.