Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 9, 2025
1 parent 1bf978b commit c3c52cb
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
19 changes: 15 additions & 4 deletions pkg/stream/blocking_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
var ErrBlockingQueueStopped = errors.New("blocking queue stopped")

type BlockingQueue[T any] struct {
queue chan T
status int32
queue chan T
status int32
capacity int
lastUpdate time.Time
}

// NewBlockingQueue initializes a new BlockingQueue with the given capacity
func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] {
return &BlockingQueue[T]{
queue: make(chan T, capacity),
status: 0,
queue: make(chan T, capacity),
capacity: capacity,
status: 0,
}
}

Expand All @@ -27,6 +30,7 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error {
if bq.IsStopped() {
return ErrBlockingQueueStopped
}
bq.lastUpdate = time.Now()
bq.queue <- item

return nil
Expand Down Expand Up @@ -59,6 +63,13 @@ func (bq *BlockingQueue[T]) IsEmpty() bool {
return len(bq.queue) == 0
}

func (bq *BlockingQueue[T]) IsReadyToSend() bool {
if bq.lastUpdate.IsZero() {
return true
}
return time.Since(bq.lastUpdate) > 10*time.Millisecond && len(bq.queue) == 0
}

// Stop stops the queue from accepting new items
// but allows some pending items.
// Stop is different from Close in that it allows the
Expand Down
13 changes: 12 additions & 1 deletion pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ func (producer *Producer) closeConfirmationStatus() {
func (producer *Producer) processPendingSequencesQueue() {

maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize
var avarage int32
iterations := 0

// the buffer is initialized with the size of the header
sequenceToSend := make([]*messageSequence, 0)
go func() {
Expand Down Expand Up @@ -316,8 +319,16 @@ func (producer *Producer) processPendingSequencesQueue() {

// if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
// the messages during the checks of the buffer. In this case
if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize {
if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize {
if len(sequenceToSend) > 0 {
avarage = atomic.AddInt32(&avarage, int32(len(sequenceToSend)))

Check failure on line 324 in pkg/stream/producer.go

View workflow job for this annotation

GitHub Actions / test (1.22)

direct assignment to atomic value
iterations++
if iterations > 10000 {
logs.LogInfo("producer %d avarage: %d", producer.id, avarage/int32(iterations))
avarage = 0
iterations = 0
}

lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend += initBufferPublishSize
Expand Down

0 comments on commit c3c52cb

Please sign in to comment.