Skip to content

Commit

Permalink
restore timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 10, 2025
1 parent 3374ca6 commit d9ac9ac
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ type ProducerOptions struct {
// It is not used anymore given the dynamic batching
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send()
// Deprecated: starting from 1.5.0 the BatchPublishingDelay is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching

BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send()
// Size of sub Entry, to aggregate more subEntry using one publishing id
SubEntrySize int
Expand Down Expand Up @@ -289,57 +288,77 @@ func (producer *Producer) processPendingSequencesQueue() {
maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize
var avarage = 0
iterations := 0
// the buffer is initialized with the size of the header
go func() {
var ticker = time.NewTicker(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond)
defer ticker.Stop()

sequenceToSend := make([]*messageSequence, 0)
totalBufferToSend := initBufferPublishSize
for msg := range producer.pendingSequencesQueue.GetChannel() {

var lastError error
// the dequeue is blocking with a timeout of 500ms
// as soon as a message is available the Dequeue will be unblocked
//msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500)
if producer.pendingSequencesQueue.IsStopped() {
break
}
if msg != nil {
// There is something in the queue.Checks the buffer is still less than the maxFrame
totalBufferToSend += msg.unCompressedSize
if totalBufferToSend > maxFrame {
// if the totalBufferToSend is greater than the requestedMaxFrameSize
// the producer sends the messages and reset the buffer
lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend = initBufferPublishSize
}
var lastError error
for {
select {

sequenceToSend = append(sequenceToSend, msg)
}
case msg, ok := <-producer.pendingSequencesQueue.GetChannel():
{
if !ok {
logs.LogInfo("producer %d processPendingSequencesQueue closed", producer.id)
return
}

// if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
// the messages during the checks of the buffer. In this case
if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize {
if len(sequenceToSend) > 0 {
avarage += len(sequenceToSend)
iterations++
if iterations > 100000 {
logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations)
avarage = 0
iterations = 0
// the dequeue is blocking with a timeout of 500ms
// as soon as a message is available the Dequeue will be unblocked
//msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500)
if producer.pendingSequencesQueue.IsStopped() {
break
}
if msg != nil {
// There is something in the queue.Checks the buffer is still less than the maxFrame
totalBufferToSend += msg.unCompressedSize
if totalBufferToSend > maxFrame {
// if the totalBufferToSend is greater than the requestedMaxFrameSize
// the producer sends the messages and reset the buffer
lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend = initBufferPublishSize
}

sequenceToSend = append(sequenceToSend, msg)
}

canSend := producer.pendingSequencesQueue.IsEmpty() && producer.options.BatchPublishingDelay == 0
// if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
// the messages during the checks of the buffer. In this case
if canSend || len(sequenceToSend) >= producer.options.BatchSize {
if len(sequenceToSend) > 0 {
avarage += len(sequenceToSend)
iterations++
if iterations > 100000 {
logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations)
avarage = 0
iterations = 0
}

lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend += initBufferPublishSize
}
}
}
case <-ticker.C:
if len(sequenceToSend) > 0 {
lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend += initBufferPublishSize
}
}

if lastError != nil {
logs.LogError("error during sending messages: %s", lastError)
}
}

logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
}()
logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
}

func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {
Expand Down

0 comments on commit d9ac9ac

Please sign in to comment.