Skip to content

Commit

Permalink
remove loop
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 13, 2025
1 parent 9ff963f commit 811d615
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 46 deletions.
5 changes: 2 additions & 3 deletions pkg/stream/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ var _ = Describe("Compression algorithms", func() {
}

message := &messageSequence{
messageBytes: messagePayload,
unCompressedSize: len(messagePayload),
publishingId: 0,
messageBytes: messagePayload,
publishingId: 0,
}

entries = &subEntries{
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (c *Client) heartBeat() {
tickerHeartbeat.Stop()
return
case <-tickerHeartbeat.C:
for c.socket.isOpen() {
if c.socket.isOpen() {
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
v := atomic.AddInt32(&heartBeatMissed, 1)
logs.LogWarn("Missing heart beat: %d", v)
Expand Down
8 changes: 4 additions & 4 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ func (consumer *Consumer) close(reason Event) error {
consumer.closeHandler = nil
}

if consumer.chunkForConsumer != nil {
close(consumer.chunkForConsumer)
consumer.chunkForConsumer = nil
}
//if consumer.chunkForConsumer != nil {
close(consumer.chunkForConsumer)
//consumer.chunkForConsumer = nil
//}

if consumer.response.data != nil {
close(consumer.response.data)
Expand Down
5 changes: 0 additions & 5 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ func (coordinator *Coordinator) NewProducer(
status: open,
pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize),
confirmMutex: &sync.Mutex{},
pool: &sync.Pool{
New: func() any {
return &messageSequence{}
},
},
}
coordinator.producers[lastId] = producer
return producer, err
Expand Down
25 changes: 7 additions & 18 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type Producer struct {
publishConfirmation chan []*ConfirmationStatus

pendingSequencesQueue *BlockingQueue[*messageSequence]
pool *sync.Pool
}

type FilterValue func(message message.StreamMessage) string
Expand Down Expand Up @@ -296,7 +295,6 @@ func (producer *Producer) processPendingSequencesQueue() {
var lastError error
for {
select {

case msg, ok := <-producer.pendingSequencesQueue.GetChannel():
{
if !ok {
Expand Down Expand Up @@ -379,12 +377,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str
if producer.options.IsFilterEnabled() {
filterValue = producer.options.Filter.FilterValue(streamMessage)
}
fromPool := producer.pool.Get().(*messageSequence)
fromPool.messageBytes = marshalBinary
fromPool.publishingId = seq
fromPool.filterValue = filterValue
msqSeq := &messageSequence{}
msqSeq.messageBytes = marshalBinary
msqSeq.publishingId = seq
msqSeq.filterValue = filterValue

return fromPool, nil
return msqSeq, nil

}

Expand All @@ -403,7 +401,6 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
if len(messageSeq.messageBytes) > defaultMaxFrameSize {
tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge)
producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge})
producer.pool.Put(messageSeq)
return FrameTooLarge
}

Expand Down Expand Up @@ -443,7 +440,6 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
for _, msg := range messagesSequence {
m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge)
producer.sendConfirmationStatus([]*ConfirmationStatus{m})
producer.pool.Put(msg)
}
return FrameTooLarge
}
Expand All @@ -457,13 +453,6 @@ func (producer *Producer) GetID() uint8 {
}

func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error {
// remove form pool
defer func() {
for _, m := range messagesSequence {
producer.pool.Put(m)
}
}()

return producer.internalBatchSendProdId(messagesSequence, producer.GetID())
}

Expand Down Expand Up @@ -535,8 +524,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c
// / the producer id is always the producer.GetID(). This function is needed only for testing
// some condition, like simulate publish error.
func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error {
//producer.options.client.socket.mutexMessageMap.Lock()
//defer producer.options.client.socket.mutexMessageMap.Unlock()
producer.options.client.socket.mutex.Lock()
defer producer.options.client.socket.mutex.Unlock()
if producer.getStatus() == closed {
return fmt.Errorf("producer id: %d closed", producer.id)
}
Expand Down
23 changes: 8 additions & 15 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ var _ = Describe("Streaming Producers", func() {

})

It("Smart Send/Close", Focus, func() {
It("Smart Send/Close", func() {
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
Expect(err).NotTo(HaveOccurred())
var messagesReceived int32
Expand Down Expand Up @@ -570,17 +570,15 @@ var _ = Describe("Streaming Producers", func() {
for i := 0; i < 1; i++ {
s := make([]byte, 50)
messagesSequence[i] = &messageSequence{
messageBytes: s,
unCompressedSize: len(s),
messageBytes: s,
}
}

msg := amqp.NewMessage([]byte("test"))
msg.SetPublishingId(1)
messageBytes, _ := msg.MarshalBinary()
messagesSequence[0] = &messageSequence{
messageBytes: messageBytes,
unCompressedSize: len(messageBytes),
messageBytes: messageBytes,
}

// 200 producer ID doesn't exist
Expand Down Expand Up @@ -659,8 +657,7 @@ var _ = Describe("Streaming Producers", func() {
for i := 0; i < 201; i++ {
s := make([]byte, 50)
messagesSequence[i] = &messageSequence{
messageBytes: s,
unCompressedSize: len(s),
messageBytes: s,
}
}

Expand All @@ -676,8 +673,7 @@ var _ = Describe("Streaming Producers", func() {

s := make([]byte, 50)
messagesSequence[i] = &messageSequence{
messageBytes: s,
unCompressedSize: len(s),
messageBytes: s,
}
}

Expand All @@ -692,8 +688,7 @@ var _ = Describe("Streaming Producers", func() {
for i := 0; i < 1; i++ {
s := make([]byte, 50)
messagesSequence[i] = &messageSequence{
messageBytes: s,
unCompressedSize: len(s),
messageBytes: s,
}
}

Expand All @@ -708,8 +703,7 @@ var _ = Describe("Streaming Producers", func() {
for i := 0; i < 1000; i++ {
s := make([]byte, 50)
messagesSequence[i] = &messageSequence{
messageBytes: s,
unCompressedSize: len(s),
messageBytes: s,
}
}

Expand All @@ -724,8 +718,7 @@ var _ = Describe("Streaming Producers", func() {
for i := 0; i < 14; i++ {
s := make([]byte, 50)
messagesSequence[i] = &messageSequence{
messageBytes: s,
unCompressedSize: len(s),
messageBytes: s,
}
}

Expand Down

0 comments on commit 811d615

Please sign in to comment.