From 054bc5df1f54db1ed1630c6bf328d746513a38fa Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 23 Feb 2024 10:50:50 +0100 Subject: [PATCH] reduce the confirmation timeout Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 14 ++++++++------ pkg/stream/producer_test.go | 16 ++++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 8854de95..37052d2a 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -508,15 +508,17 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequ func (producer *Producer) flushUnConfirmedMessages() { producer.mutex.Lock() - if producer.publishConfirm != nil { - for _, msg := range producer.unConfirmedMessages { - msg.confirmed = false - msg.err = ConnectionClosed - msg.errorCode = connectionCloseError + + for _, msg := range producer.unConfirmedMessages { + msg.confirmed = false + msg.err = ConnectionClosed + msg.errorCode = connectionCloseError + if producer.publishConfirm != nil { producer.publishConfirm <- []*ConfirmationStatus{msg} - delete(producer.unConfirmedMessages, msg.publishingId) } + delete(producer.unConfirmedMessages, msg.publishingId) } + producer.mutex.Unlock() } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index f9188e1d..2c811903 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -138,7 +138,11 @@ var _ = Describe("Streaming Producers", func() { BeforeEach(func() { producer = createProducer( - NewProducerOptions().SetBatchSize(BatchSize).SetSubEntrySize(SubEntrySize).SetCompression(Compression{}.None()), + NewProducerOptions(). + SetBatchSize(BatchSize). + SetSubEntrySize(SubEntrySize). + SetConfirmationTimeOut(1*time.Second). + SetCompression(Compression{}.None()), &messagesReceived, testEnvironment, testProducerStream, @@ -229,7 +233,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(14)), "confirm should receive same messages send by producer") - Expect(len(producer.unConfirmedMessages)).To(Equal(0)) + Expect(producer.lenUnConfirmed()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -708,7 +712,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(33).SetCompression(Compression{}.Gzip())) Expect(err).NotTo(HaveOccurred()) testCompress(producerGZIP) - Expect(len(producerGZIP.unConfirmedMessages)).To(Equal(0)) + Expect(producerGZIP.lenUnConfirmed()).To(Equal(0)) Expect(producerGZIP.Close()).NotTo(HaveOccurred()) producerLz4, err := testEnvironment.NewProducer(testProducerStream, @@ -716,7 +720,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(55).SetCompression(Compression{}.Lz4())) Expect(err).NotTo(HaveOccurred()) testCompress(producerLz4) - Expect(len(producerLz4.unConfirmedMessages)).To(Equal(0)) + Expect(producerLz4.lenUnConfirmed()).To(Equal(0)) Expect(producerLz4.Close()).NotTo(HaveOccurred()) producerSnappy, err := testEnvironment.NewProducer(testProducerStream, @@ -724,7 +728,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(666).SetCompression(Compression{}.Snappy())) Expect(err).NotTo(HaveOccurred()) testCompress(producerSnappy) - Expect(len(producerSnappy.unConfirmedMessages)).To(Equal(0)) + Expect(producerSnappy.lenUnConfirmed()).To(Equal(0)) Expect(producerSnappy.Close()).NotTo(HaveOccurred()) producerZstd, err := testEnvironment.NewProducer(testProducerStream, @@ -732,7 +736,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(98).SetCompression(Compression{}.Zstd())) Expect(err).NotTo(HaveOccurred()) testCompress(producerZstd) - Expect(len(producerZstd.unConfirmedMessages)).To(Equal(0)) + Expect(producerZstd.lenUnConfirmed()).To(Equal(0)) Expect(producerZstd.Close()).NotTo(HaveOccurred()) })