Skip to content

Commit

Permalink
Add limit to the unconfirmed messages (#378)
Browse files Browse the repository at this point in the history
* Add limit to the unconfirmed messages
  The producer is blocked when the limit is reached. It is possible to configure the
   limit using the producerOption.QueueSize setting.
   Closes: #373

* Close the TCP client connection in case CRC fails. The panic is removed.
  Closes: #368

---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Co-authored-by: Alberto Moretti <58828402+hiimjako@users.noreply.github.com>
  • Loading branch information
Gsantomaggio and hiimjako authored Jan 27, 2025
1 parent 51c3dca commit 7a3d780
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 22 deletions.
2 changes: 2 additions & 0 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
variableBody int
fixedBody int
batchSize int
queueSize int
subEntrySize int
compression string
exitOnError bool
Expand All @@ -58,6 +59,7 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().StringSliceVarP(&rabbitmqBrokerUrl, "uris", "", []string{stream.LocalhostUriConnection}, "Broker URLs")
baseCmd.PersistentFlags().IntVarP(&publishers, "publishers", "", 1, "Number of Publishers")
baseCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "", 200, "Batch Size, from 1 to 300")
baseCmd.PersistentFlags().IntVarP(&queueSize, "queue-size", "", 50_000, "Queue Size for the server back pressure = messages send - messages confirmed")
baseCmd.PersistentFlags().IntVarP(&subEntrySize, "sub-entry-size", "", 1, "SubEntry size, default 1. > 1 Enable the subEntryBatch")
baseCmd.PersistentFlags().StringVarP(&compression, "compression", "", "", "Compression for sub batching, none,gzip,lz4,snappy,zstd")
baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "", 1, "Number of Consumers")
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ const (
///
defaultWriteSocketBuffer = 8192
defaultReadSocketBuffer = 8192
defaultQueuePublisherSize = 10000
minQueuePublisherSize = 100
defaultQueuePublisherSize = 10_000
minQueuePublisherSize = 500
maxQueuePublisherSize = 1_000_000

minBatchSize = 1
Expand Down
4 changes: 3 additions & 1 deletion pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (coordinator *Coordinator) NewProducer(
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
dynSize := 10000
queueSize := defaultQueuePublisherSize
tickerTime := defaultConfirmationTimeOut
if parameters != nil {
dynSize = parameters.BatchSize
tickerTime = parameters.ConfirmationTimeOut
queueSize = parameters.QueueSize
}

var lastId, err = coordinator.getNextProducerItem()
Expand All @@ -67,7 +69,7 @@ func (coordinator *Coordinator) NewProducer(
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
unConfirmed: newUnConfirmed(queueSize),
confirmationTimeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}, 1),
status: open,
Expand Down
13 changes: 7 additions & 6 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ type ProducerOptions struct {
// Deduplication is a feature that allows the producer to avoid sending duplicate messages to the stream.
// see: https://www.rabbitmq.com/blog/2021/07/28/rabbitmq-streams-message-deduplication for more information.
// Don't use it if you don't need the deduplication.
Name string
// Deprecated: starting from 1.5.0 the QueueSize is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
Name string
QueueSize int // Internal queue to handle back-pressure.
// Default value is enough high (See defaultQueuePublisherSize). You usually don't need to change it unless high memory usage is a concern.
// High value can increase the memory usage and deal with spikes in the traffic.
// Low value can reduce the memory usage but can increase the back-pressure on the server.

BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send()
// Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching
Expand All @@ -134,8 +136,7 @@ func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions {
return po
}

// Deprecated: starting from 1.5.0 the SetQueueSize is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching
// SetQueueSize See ProducerOptions.QueueSize for more details
func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions {
po.QueueSize = size
return po
Expand Down
2 changes: 2 additions & 0 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ var _ = Describe("Streaming Producers", func() {
It(" sub-entry batching test Aggregation", func() {
producer, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(100).
SetQueueSize(1000).
SetSubEntrySize(77))
Expect(err).NotTo(HaveOccurred())
messagesSequence := make([]*messageSequence, 201)
Expand Down Expand Up @@ -737,6 +738,7 @@ var _ = Describe("Streaming Producers", func() {
It("Sub Size Publish Confirm/Send", func() {
producer, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(100).
SetQueueSize(500).
SetSubEntrySize(77))
Expect(err).NotTo(HaveOccurred())
var messagesConfirmed int32
Expand Down
40 changes: 31 additions & 9 deletions pkg/stream/producer_unconfirmed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"sync"
"time"
)
Expand All @@ -16,22 +17,30 @@ import (
type unConfirmed struct {
messages map[int64]*ConfirmationStatus
mutexMessageMap sync.RWMutex
maxSize int
blockSignal *sync.Cond
}

const DefaultUnconfirmedSize = 10_000

func newUnConfirmed() *unConfirmed {
func newUnConfirmed(maxSize int) *unConfirmed {
r := &unConfirmed{
messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize),
messages: make(map[int64]*ConfirmationStatus, maxSize),
mutexMessageMap: sync.RWMutex{},
maxSize: maxSize,
blockSignal: sync.NewCond(&sync.Mutex{}),
}
return r
}

func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) {
u.mutexMessageMap.Lock()
defer u.mutexMessageMap.Unlock()

if u.size() > u.maxSize {
logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize)
u.blockSignal.L.Lock()
u.blockSignal.Wait()
u.blockSignal.L.Unlock()
}

u.mutexMessageMap.Lock()
for _, msgSeq := range messages {
u.messages[msgSeq.publishingId] = &ConfirmationStatus{
inserted: time.Now(),
Expand All @@ -40,8 +49,9 @@ func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID u
publishingId: msgSeq.publishingId,
confirmed: false,
}

}
u.mutexMessageMap.Unlock()

}

func (u *unConfirmed) link(from int64, to int64) {
Expand All @@ -67,14 +77,16 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus {
}
}
}
u.maybeUnLock()
return res

}

func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus {
u.mutexMessageMap.Lock()
defer u.mutexMessageMap.Unlock()
return u.extract(id, errorCode, false)
cs := u.extract(id, errorCode, false)
u.maybeUnLock()
return cs
}

func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *ConfirmationStatus {
Expand Down Expand Up @@ -110,6 +122,7 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS
res = append(res, v)
}
}
u.maybeUnLock()
return res
}

Expand All @@ -118,3 +131,12 @@ func (u *unConfirmed) size() int {
defer u.mutexMessageMap.Unlock()
return len(u.messages)
}

func (u *unConfirmed) maybeUnLock() {
if len(u.messages) < u.maxSize {
logs.LogDebug("unConfirmed size: %d back to normal, producer unblocked", u.maxSize)
u.blockSignal.L.Lock()
u.blockSignal.Broadcast()
u.blockSignal.L.Unlock()
}
}
8 changes: 4 additions & 4 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
if consumer.options.CRCCheck {
checkSum := crc32.ChecksumIEEE(bytesBuffer)
if crc != checkSum {
logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum)
panic("Error during CRC")
} /// ???
logs.LogError("Error during the checkSum, expected %d, checksum %d. Tcp connection will be closed", crc, checkSum)
c.Close()
}
}

bufferReader := bytes.NewReader(bytesBuffer)
Expand Down Expand Up @@ -469,7 +469,7 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) {
producer, err := c.coordinator.GetProducerById(publisherId)
if err != nil {
logs.LogWarn("producer id %d not found, publish error :%s", publisherId, lookErrorCode(code))
producer = &Producer{unConfirmed: newUnConfirmed()}
producer = &Producer{unConfirmed: newUnConfirmed(defaultQueuePublisherSize)}
} else {
unConfirmedMessage := producer.unConfirmed.extractWithError(publishingId, code)

Expand Down

0 comments on commit 7a3d780

Please sign in to comment.