Skip to content

Commit

Permalink
aggregate responses
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 10, 2025
1 parent 6a1e0f9 commit 3374ca6
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 82 deletions.
18 changes: 11 additions & 7 deletions pkg/stream/blocking_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type BlockingQueue[T any] struct {
queue chan T
status int32
capacity int
lastUpdate time.Time
lastUpdate int64
}

// NewBlockingQueue initializes a new BlockingQueue with the given capacity
Expand All @@ -30,9 +30,8 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error {
if bq.IsStopped() {
return ErrBlockingQueueStopped
}
bq.lastUpdate = time.Now()
atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano())
bq.queue <- item

return nil
}

Expand All @@ -42,6 +41,7 @@ func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T {
var zeroValue T // Zero value of type T
return zeroValue
}

select {
case item, ok := <-bq.queue:
if !ok {
Expand All @@ -61,6 +61,10 @@ func (bq *BlockingQueue[T]) Process(f func(T)) {
}
}

func (bq *BlockingQueue[T]) GetChannel() chan T {
return bq.queue
}

func (bq *BlockingQueue[T]) Size() int {
return len(bq.queue)
}
Expand All @@ -70,10 +74,10 @@ func (bq *BlockingQueue[T]) IsEmpty() bool {
}

func (bq *BlockingQueue[T]) IsReadyToSend() bool {
if bq.lastUpdate.IsZero() {
return true
}
return time.Since(bq.lastUpdate) > 10*time.Millisecond && len(bq.queue) == 0

millis := time.Since(time.Unix(0, atomic.LoadInt64(&bq.lastUpdate))).Milliseconds()

return millis > 10 && len(bq.queue) == 0
}

// Stop stops the queue from accepting new items
Expand Down
3 changes: 2 additions & 1 deletion pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ const (
///
defaultSocketCallTimeout = 10 * time.Second

defaultHeartbeat = 60 * time.Second
defaultHeartbeat = 60 * time.Second
defaultMaxFrameSize = 1048574

//
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ var _ = Describe("Environment test", func() {
TCPParameters: &TCPParameters{
tlsConfig: nil,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048574,
RequestedMaxFrameSize: defaultMaxFrameSize,
WriteBuffer: 100,
ReadBuffer: 200,
NoDelay: false,
Expand Down
74 changes: 19 additions & 55 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,19 @@ func (producer *Producer) processPendingSequencesQueue() {
maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize
var avarage = 0
iterations := 0
// the buffer is initialized with the size of the header
go func() {
sequenceToSend := make([]*messageSequence, 0)
totalBufferToSend := initBufferPublishSize
producer.pendingSequencesQueue.Process(func(msg *messageSequence) {
for msg := range producer.pendingSequencesQueue.GetChannel() {

var lastError error
// the dequeue is blocking with a timeout of 500ms
// as soon as a message is available the Dequeue will be unblocked
//msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500)
if producer.pendingSequencesQueue.IsStopped() {
break
}
if msg != nil {
// There is something in the queue.Checks the buffer is still less than the maxFrame
totalBufferToSend += msg.unCompressedSize
Expand All @@ -304,13 +312,17 @@ func (producer *Producer) processPendingSequencesQueue() {
sequenceToSend = sequenceToSend[:0]
totalBufferToSend = initBufferPublishSize
}

sequenceToSend = append(sequenceToSend, msg)
}
if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize {

// if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
// the messages during the checks of the buffer. In this case
if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize {
if len(sequenceToSend) > 0 {
avarage += len(sequenceToSend)
iterations++
if iterations > 10000 {
if iterations > 100000 {
logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations)
avarage = 0
iterations = 0
Expand All @@ -324,58 +336,10 @@ func (producer *Producer) processPendingSequencesQueue() {
if lastError != nil {
logs.LogError("error during sending messages: %s", lastError)
}
}

})
logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
}()
logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
// the buffer is initialized with the size of the header

// for {
// var lastError error
// // the dequeue is blocking with a timeout of 500ms
// // as soon as a message is available the Dequeue will be unblocked
// msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500)
// if producer.pendingSequencesQueue.IsStopped() {
// break
// }
//
// if msg != nil {
// // There is something in the queue.Checks the buffer is still less than the maxFrame
// totalBufferToSend += msg.unCompressedSize
// if totalBufferToSend > maxFrame {
// // if the totalBufferToSend is greater than the requestedMaxFrameSize
// // the producer sends the messages and reset the buffer
// lastError = producer.internalBatchSend(sequenceToSend)
// sequenceToSend = sequenceToSend[:0]
// totalBufferToSend = initBufferPublishSize
// }
//
// sequenceToSend = append(sequenceToSend, msg)
// }
//
// // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
// // the messages during the checks of the buffer. In this case
// if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize {
// if len(sequenceToSend) > 0 {
// avarage += len(sequenceToSend)
// iterations++
// if iterations > 10000 {
// logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations)
// avarage = 0
// iterations = 0
// }
//
// lastError = producer.internalBatchSend(sequenceToSend)
// sequenceToSend = sequenceToSend[:0]
// totalBufferToSend += initBufferPublishSize
// }
// }
// if lastError != nil {
// logs.LogError("error during sending messages: %s", lastError)
// }
// }
// logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
//}()
}

func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {
Expand Down Expand Up @@ -420,7 +384,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
}
producer.unConfirmed.addFromSequence(messageSeq, producer.GetID())

if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize {
if len(messageSeq.messageBytes) > defaultMaxFrameSize {
tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge)
producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge})
return FrameTooLarge
Expand All @@ -440,7 +404,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
// BatchSend is not affected by the BatchSize and BatchPublishingDelay options.
// returns an error if the message could not be sent for marshal problems or if the buffer is too large
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize
maxFrame := defaultMaxFrameSize
var messagesSequence = make([]*messageSequence, 0)
totalBufferToSend := 0
for _, batchMessage := range batchMessages {
Expand Down
5 changes: 3 additions & 2 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ var _ = Describe("Streaming Producers", func() {

})

It("Smart Send/Close", func() {
It("Smart Send/Close", Focus, func() {
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
Expect(err).NotTo(HaveOccurred())
var messagesReceived int32
Expand All @@ -331,9 +331,10 @@ var _ = Describe("Streaming Producers", func() {
Expect(producer.Send(amqp.NewMessage(s))).NotTo(HaveOccurred())
}

time.Sleep(1500 * time.Millisecond)
Eventually(func() int32 {
return atomic.LoadInt32(&messagesReceived)
}, 5*time.Second).Should(Equal(int32(101)),
}, 5*time.Second).WithPolling(300*time.Millisecond).Should(Equal(int32(101)),
"confirm should receive same messages Send by producer")

Expect(producer.lenUnConfirmed()).To(Equal(0))
Expand Down
15 changes: 13 additions & 2 deletions pkg/stream/producer_unconfirmed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,21 @@ func (u *unConfirmed) link(from int64, to int64) {
}
}

func (u *unConfirmed) extractWithConfirm(id int64) *ConfirmationStatus {
func (u *unConfirmed) extractWithConfirms(id []int64) []*ConfirmationStatus {
u.mutex.Lock()
defer u.mutex.Unlock()
return u.extract(id, 0, true)
var res []*ConfirmationStatus
for _, v := range id {
m := u.extract(v, 0, true)
if m != nil {
res = append(res, m)
if m.linkedTo != nil {
res = append(res, m.linkedTo...)
}
}
}
return res

}

func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus {
Expand Down
30 changes: 16 additions & 14 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,26 +255,28 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in
// even the producer is not found we need to read the publishingId
// to empty the buffer.
// The producer here could not exist because the producer is closed before the confirmations are received
var unConfirmedRecv []*ConfirmationStatus
//var unConfirmedRecv []*ConfirmationStatus
var arraySeq []int64
for publishingIdCount != 0 {
seq := readInt64(r)
if producerFound {

m := producer.unConfirmed.extractWithConfirm(seq)
if m != nil {
unConfirmedRecv = append(unConfirmedRecv, m)

// in case of sub-batch entry the client receives only
// one publishingId (or sequence)
// so the other messages are confirmed using the linkedTo
unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...)
}
}
arraySeq = append(arraySeq, seq)
//if producerFound {
// m := producer.unConfirmed.extractWithConfirm(seq)
// if m != nil {
// unConfirmedRecv = append(unConfirmedRecv, m)
//
// // in case of sub-batch entry the client receives only
// // one publishingId (or sequence)
// // so the other messages are confirmed using the linkedTo
// unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...)
// }
//}

publishingIdCount--
}

if producerFound {
producer.sendConfirmationStatus(unConfirmedRecv)
producer.sendConfirmationStatus(producer.unConfirmed.extractWithConfirms(arraySeq))
}

return 0
Expand Down

0 comments on commit 3374ca6

Please sign in to comment.