diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 3f2c9916..4d56be39 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -101,8 +101,7 @@ type ProducerOptions struct { // It is not used anymore given the dynamic batching QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() - // Deprecated: starting from 1.5.0 the BatchPublishingDelay is deprecated, and it will be removed in the next releases - // It is not used anymore given the dynamic batching + BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() // Size of sub Entry, to aggregate more subEntry using one publishing id SubEntrySize int @@ -289,57 +288,77 @@ func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize var avarage = 0 iterations := 0 - // the buffer is initialized with the size of the header go func() { + var ticker = time.NewTicker(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond) + defer ticker.Stop() + sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize - for msg := range producer.pendingSequencesQueue.GetChannel() { - - var lastError error - // the dequeue is blocking with a timeout of 500ms - // as soon as a message is available the Dequeue will be unblocked - //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) - if producer.pendingSequencesQueue.IsStopped() { - break - } - if msg != nil { - // There is something in the queue.Checks the buffer is still less than the maxFrame - totalBufferToSend += msg.unCompressedSize - if totalBufferToSend > maxFrame { - // if the totalBufferToSend is greater than the requestedMaxFrameSize - // the producer sends the messages and reset the buffer - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend = initBufferPublishSize - } + var lastError error + for { + select { - sequenceToSend = append(sequenceToSend, msg) - } + case msg, ok := <-producer.pendingSequencesQueue.GetChannel(): + { + if !ok { + logs.LogInfo("producer %d processPendingSequencesQueue closed", producer.id) + return + } - // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending - // the messages during the checks of the buffer. In this case - if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { - if len(sequenceToSend) > 0 { - avarage += len(sequenceToSend) - iterations++ - if iterations > 100000 { - logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) - avarage = 0 - iterations = 0 + // the dequeue is blocking with a timeout of 500ms + // as soon as a message is available the Dequeue will be unblocked + //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + if producer.pendingSequencesQueue.IsStopped() { + break + } + if msg != nil { + // There is something in the queue.Checks the buffer is still less than the maxFrame + totalBufferToSend += msg.unCompressedSize + if totalBufferToSend > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // the producer sends the messages and reset the buffer + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend = initBufferPublishSize + } + + sequenceToSend = append(sequenceToSend, msg) } + canSend := producer.pendingSequencesQueue.IsEmpty() && producer.options.BatchPublishingDelay == 0 + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending + // the messages during the checks of the buffer. In this case + if canSend || len(sequenceToSend) >= producer.options.BatchSize { + if len(sequenceToSend) > 0 { + avarage += len(sequenceToSend) + iterations++ + if iterations > 100000 { + logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) + avarage = 0 + iterations = 0 + } + + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend += initBufferPublishSize + } + } + } + case <-ticker.C: + if len(sequenceToSend) > 0 { lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend += initBufferPublishSize } } + if lastError != nil { logs.LogError("error during sending messages: %s", lastError) } } - logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) }() + logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) } func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {