diff --git a/perfTest/cmd/commands.go b/perfTest/cmd/commands.go index 9fceccc8..951c6dab 100644 --- a/perfTest/cmd/commands.go +++ b/perfTest/cmd/commands.go @@ -39,6 +39,7 @@ var ( variableBody int fixedBody int batchSize int + queueSize int subEntrySize int compression string exitOnError bool @@ -58,6 +59,7 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().StringSliceVarP(&rabbitmqBrokerUrl, "uris", "", []string{stream.LocalhostUriConnection}, "Broker URLs") baseCmd.PersistentFlags().IntVarP(&publishers, "publishers", "", 1, "Number of Publishers") baseCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "", 200, "Batch Size, from 1 to 300") + baseCmd.PersistentFlags().IntVarP(&queueSize, "queue-size", "", 50_000, "Queue Size for the server back pressure = messages send - messages confirmed") baseCmd.PersistentFlags().IntVarP(&subEntrySize, "sub-entry-size", "", 1, "SubEntry size, default 1. > 1 Enable the subEntryBatch") baseCmd.PersistentFlags().StringVarP(&compression, "compression", "", "", "Compression for sub batching, none,gzip,lz4,snappy,zstd") baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "", 1, "Number of Consumers") diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index a0979ef4..1b748a92 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -96,8 +96,8 @@ const ( /// defaultWriteSocketBuffer = 8192 defaultReadSocketBuffer = 8192 - defaultQueuePublisherSize = 10000 - minQueuePublisherSize = 100 + defaultQueuePublisherSize = 10_000 + minQueuePublisherSize = 500 maxQueuePublisherSize = 1_000_000 minBatchSize = 1 diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 4d8b6f62..1060a66c 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -54,10 +54,12 @@ func (coordinator *Coordinator) NewProducer( coordinator.mutex.Lock() defer coordinator.mutex.Unlock() dynSize := 10000 + queueSize := defaultQueuePublisherSize tickerTime := defaultConfirmationTimeOut if parameters != nil { dynSize = parameters.BatchSize tickerTime = parameters.ConfirmationTimeOut + queueSize = parameters.QueueSize } var lastId, err = coordinator.getNextProducerItem() @@ -67,7 +69,7 @@ func (coordinator *Coordinator) NewProducer( var producer = &Producer{id: lastId, options: parameters, mutex: &sync.RWMutex{}, - unConfirmed: newUnConfirmed(), + unConfirmed: newUnConfirmed(queueSize), confirmationTimeoutTicker: time.NewTicker(tickerTime), doneTimeoutTicker: make(chan struct{}, 1), status: open, diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 9c533d83..e7441c85 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -105,10 +105,12 @@ type ProducerOptions struct { // Deduplication is a feature that allows the producer to avoid sending duplicate messages to the stream. // see: https://www.rabbitmq.com/blog/2021/07/28/rabbitmq-streams-message-deduplication for more information. // Don't use it if you don't need the deduplication. - Name string - // Deprecated: starting from 1.5.0 the QueueSize is deprecated, and it will be removed in the next releases - // It is not used anymore given the dynamic batching - QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server + Name string + QueueSize int // Internal queue to handle back-pressure. + // Default value is enough high (See defaultQueuePublisherSize). You usually don't need to change it unless high memory usage is a concern. + // High value can increase the memory usage and deal with spikes in the traffic. + // Low value can reduce the memory usage but can increase the back-pressure on the server. + BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() // Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases // It is not used anymore given the dynamic batching @@ -134,8 +136,7 @@ func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions { return po } -// Deprecated: starting from 1.5.0 the SetQueueSize is deprecated, and it will be removed in the next releases -// It is not used anymore given the dynamic batching +// SetQueueSize See ProducerOptions.QueueSize for more details func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions { po.QueueSize = size return po diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 45b480bb..fec98462 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -654,6 +654,7 @@ var _ = Describe("Streaming Producers", func() { It(" sub-entry batching test Aggregation", func() { producer, err := testEnvironment.NewProducer(testProducerStream, NewProducerOptions().SetBatchPublishingDelay(100). + SetQueueSize(1000). SetSubEntrySize(77)) Expect(err).NotTo(HaveOccurred()) messagesSequence := make([]*messageSequence, 201) @@ -737,6 +738,7 @@ var _ = Describe("Streaming Producers", func() { It("Sub Size Publish Confirm/Send", func() { producer, err := testEnvironment.NewProducer(testProducerStream, NewProducerOptions().SetBatchPublishingDelay(100). + SetQueueSize(500). SetSubEntrySize(77)) Expect(err).NotTo(HaveOccurred()) var messagesConfirmed int32 diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 0cc0f606..cad6c27d 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,6 +1,7 @@ package stream import ( + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "sync" "time" ) @@ -16,22 +17,30 @@ import ( type unConfirmed struct { messages map[int64]*ConfirmationStatus mutexMessageMap sync.RWMutex + maxSize int + blockSignal *sync.Cond } -const DefaultUnconfirmedSize = 10_000 - -func newUnConfirmed() *unConfirmed { +func newUnConfirmed(maxSize int) *unConfirmed { r := &unConfirmed{ - messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + messages: make(map[int64]*ConfirmationStatus, maxSize), mutexMessageMap: sync.RWMutex{}, + maxSize: maxSize, + blockSignal: sync.NewCond(&sync.Mutex{}), } return r } func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) { - u.mutexMessageMap.Lock() - defer u.mutexMessageMap.Unlock() + if u.size() > u.maxSize { + logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize) + u.blockSignal.L.Lock() + u.blockSignal.Wait() + u.blockSignal.L.Unlock() + } + + u.mutexMessageMap.Lock() for _, msgSeq := range messages { u.messages[msgSeq.publishingId] = &ConfirmationStatus{ inserted: time.Now(), @@ -40,8 +49,9 @@ func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID u publishingId: msgSeq.publishingId, confirmed: false, } - } + u.mutexMessageMap.Unlock() + } func (u *unConfirmed) link(from int64, to int64) { @@ -67,14 +77,16 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { } } } + u.maybeUnLock() return res - } func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - return u.extract(id, errorCode, false) + cs := u.extract(id, errorCode, false) + u.maybeUnLock() + return cs } func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *ConfirmationStatus { @@ -110,6 +122,7 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS res = append(res, v) } } + u.maybeUnLock() return res } @@ -118,3 +131,12 @@ func (u *unConfirmed) size() int { defer u.mutexMessageMap.Unlock() return len(u.messages) } + +func (u *unConfirmed) maybeUnLock() { + if len(u.messages) < u.maxSize { + logs.LogDebug("unConfirmed size: %d back to normal, producer unblocked", u.maxSize) + u.blockSignal.L.Lock() + u.blockSignal.Broadcast() + u.blockSignal.L.Unlock() + } +} diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index f6381446..d3a323b3 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -350,9 +350,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) { if consumer.options.CRCCheck { checkSum := crc32.ChecksumIEEE(bytesBuffer) if crc != checkSum { - logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum) - panic("Error during CRC") - } /// ??? + logs.LogError("Error during the checkSum, expected %d, checksum %d. Tcp connection will be closed", crc, checkSum) + c.Close() + } } bufferReader := bytes.NewReader(bytesBuffer) @@ -469,7 +469,7 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) { producer, err := c.coordinator.GetProducerById(publisherId) if err != nil { logs.LogWarn("producer id %d not found, publish error :%s", publisherId, lookErrorCode(code)) - producer = &Producer{unConfirmed: newUnConfirmed()} + producer = &Producer{unConfirmed: newUnConfirmed(defaultQueuePublisherSize)} } else { unConfirmedMessage := producer.unConfirmed.extractWithError(publishingId, code)