diff --git a/Makefile b/Makefile index 5e682f80..a7ab2bce 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,7 @@ rabbitmq-ha-proxy: mv compose/ha_tls/tls-gen/basic/result/server_*key.pem compose/ha_tls/tls-gen/basic/result/server_key.pem cd compose/ha_tls; docker build -t haproxy-rabbitmq-cluster . cd compose/ha_tls; docker-compose down - cd compose/ha_tls; docker-compose up -d + cd compose/ha_tls; docker-compose up rabbitmq-server-tls: cd compose/tls; rm -rf tls-gen; diff --git a/compose/ha_tls/docker-compose.yml b/compose/ha_tls/docker-compose.yml index a1d4e82f..ecd5f2fc 100644 --- a/compose/ha_tls/docker-compose.yml +++ b/compose/ha_tls/docker-compose.yml @@ -7,10 +7,12 @@ services: networks: - back hostname: node0 - image: docker.io/rabbitmq:3.13-rc-management + image: rabbitmq:3.13-rc-management + pull_policy: always ports: - "5561:5551" - "5562:5552" + - "5682:5672" tty: true volumes: - ./conf/:/etc/rabbitmq/ @@ -22,10 +24,12 @@ services: networks: - back hostname: node1 - image: docker.io/rabbitmq:3.13-rc-management + image: rabbitmq:3.13-rc-management + pull_policy: always ports: - "5571:5551" - "5572:5552" + - "5692:5672" tty: true volumes: - ./conf/:/etc/rabbitmq/ @@ -37,10 +41,12 @@ services: networks: - back hostname: node2 - image: docker.io/rabbitmq:3.13-rc-management + image: rabbitmq:3.13-rc-management + pull_policy: always ports: - "5581:5551" - "5582:5552" + - "5602:5672" tty: true volumes: - ./conf/:/etc/rabbitmq/ @@ -52,6 +58,7 @@ services: ports: - "5553:5552" - "5554:5551" + - "5674:5672" - "15673:15672" networks: - back diff --git a/compose/ha_tls/haproxy.cfg b/compose/ha_tls/haproxy.cfg index 57b1cff7..c9dfefa9 100644 --- a/compose/ha_tls/haproxy.cfg +++ b/compose/ha_tls/haproxy.cfg @@ -26,6 +26,13 @@ listen rabbitmq-stream server rabbit_node1 rabbit_node1:5552 check inter 5000 fall 3 server rabbit_node2 rabbit_node2:5552 check inter 5000 fall 3 +listen rabbitmq-amqp + bind 0.0.0.0:5672 + balance roundrobin + server rabbit_node0 rabbit_node0:5672 check inter 5000 fall 3 + server rabbit_node1 rabbit_node1:5672 check inter 5000 fall 3 + server rabbit_node2 rabbit_node2:5672 check inter 5000 fall 3 + listen rabbitmq-ui bind 0.0.0.0:15672 diff --git a/examples/haProducer/producer.go b/examples/haProducer/producer.go index ad688a24..c4d092d0 100644 --- a/examples/haProducer/producer.go +++ b/examples/haProducer/producer.go @@ -7,8 +7,6 @@ package main import ( "bufio" "fmt" - "github.com/google/uuid" - "github.com/rabbitmq/rabbitmq-stream-go-client/examples/haProducer/http" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" @@ -26,21 +24,21 @@ func CheckErr(err error) { } } -var counter int32 = 0 +var confirmed int32 = 0 var fail int32 = 0 +var mutex = sync.Mutex{} +var unConfirmedMessages []message.StreamMessage func handlePublishConfirm(messageStatus []*stream.ConfirmationStatus) { go func() { - for _, message := range messageStatus { - if message.IsConfirmed() { - - if atomic.AddInt32(&counter, 1)%20000 == 0 { - fmt.Printf("Confirmed %d messages\n", atomic.LoadInt32(&counter)) - } + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + atomic.AddInt32(&confirmed, 1) } else { - if atomic.AddInt32(&fail, 1)%20000 == 0 { - fmt.Printf("NOT Confirmed %d messages\n", atomic.LoadInt32(&fail)) - } + atomic.AddInt32(&fail, 1) + mutex.Lock() + unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) + mutex.Unlock() } } @@ -52,6 +50,7 @@ func main() { fmt.Println("HA producer example") fmt.Println("Connecting to RabbitMQ streaming ...") + const messagesToSend = 20_000_000 addresses := []string{ "rabbitmq-stream://guest:guest@localhost:5552/%2f", @@ -63,83 +62,50 @@ func main() { SetUris(addresses)) CheckErr(err) - streamName := uuid.New().String() + streamName := "golang-reliable-producer-Test" + env.DeleteStream(streamName) + err = env.DeclareStream(streamName, &stream.StreamOptions{ MaxLengthBytes: stream.ByteCapacity{}.GB(2), }, ) - rProducer, err := ha.NewHAProducer(env, streamName, nil, handlePublishConfirm) - CheckErr(err) - rProducer1, err := ha.NewHAProducer(env, streamName, nil, handlePublishConfirm) + rProducer, err := ha.NewReliableProducer(env, + streamName, + stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second), handlePublishConfirm) CheckErr(err) - - wg := sync.WaitGroup{} - var sent int32 - for i := 0; i < 10; i++ { - wg.Add(1) - go func(wg *sync.WaitGroup) { - for i := 0; i < 100000; i++ { - msg := amqp.NewMessage([]byte("ha")) - err := rProducer.Send(msg) - CheckErr(err) - err = rProducer1.BatchSend([]message.StreamMessage{msg}) - if atomic.AddInt32(&sent, 2)%20000 == 0 { - time.Sleep(100 * time.Millisecond) - fmt.Printf("Sent..%d messages\n", atomic.LoadInt32(&sent)) - } - if err != nil { - break - } - } - wg.Done() - }(&wg) - } - isActive := true + isRunning := true go func() { - for isActive { - coo, err := http.Connections("15672") - if err != nil { - return - } - - for _, connection := range coo { - _ = http.DropConnection(connection.Name, "15672") - } - time.Sleep(2 * time.Second) + for isRunning { + totalHandled := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) + fmt.Printf("%s - ToSend: %d - Sent:%d - Confirmed:%d - Not confirmed:%d - Total :%d \n", + time.Now().Format(time.RFC822), messagesToSend, sent, confirmed, fail, totalHandled) + time.Sleep(5 * time.Second) } }() - wg.Wait() - isActive = false - time.Sleep(2 * time.Second) - - fmt.Println("Terminated. Press any key to see the report. ") - _, _ = reader.ReadString('\n') - time.Sleep(200 * time.Millisecond) - totalHandled := atomic.LoadInt32(&counter) + atomic.LoadInt32(&fail) - fmt.Printf("[Report]\n - Sent:%d \n - Confirmed:%d\n - Not confirmed:%d\n - Total messages handeld:%d \n", - sent, counter, fail, totalHandled) - if sent == totalHandled { - fmt.Printf(" - Messages sent %d match with handled: %d! yea! \n\n", sent, totalHandled) - } - - if totalHandled > sent { - fmt.Printf(" - Messages sent %d are lower than handled: %d! some duplication, can happens ! \n\n", sent, totalHandled) - } - - if sent > totalHandled { - fmt.Printf(" - Messages handled %d are lower than send: %d! that's not good!\n\n", totalHandled, sent) + for i := 0; i < messagesToSend; i++ { + msg := amqp.NewMessage([]byte("ha")) + mutex.Lock() + for _, confirmedMessage := range unConfirmedMessages { + err := rProducer.Send(confirmedMessage) + atomic.AddInt32(&sent, 1) + CheckErr(err) + } + unConfirmedMessages = []message.StreamMessage{} + mutex.Unlock() + err := rProducer.Send(msg) + atomic.AddInt32(&sent, 1) + CheckErr(err) } + fmt.Println("Terminated. Press enter to close the connections.") + _, _ = reader.ReadString('\n') + isRunning = false err = rProducer.Close() CheckErr(err) - err = rProducer1.Close() - CheckErr(err) - err = env.DeleteStream(streamName) - CheckErr(err) err = env.Close() CheckErr(err) } diff --git a/generate/generate_amqp10_messages.go b/generate/generate_amqp10_messages.go index 571bda59..d8656272 100644 --- a/generate/generate_amqp10_messages.go +++ b/generate/generate_amqp10_messages.go @@ -59,6 +59,7 @@ func main() { msg := amqp.NewMessage([]byte("")) binary, err := msg.MarshalBinary() + //msg.UnmarshalBinary() if err != nil { return } diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 779679f8..6616d7b1 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -264,7 +264,7 @@ func startPublisher(streamName string) error { logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp) } - rPublisher, err := ha.NewHAProducer(simulEnvironment, + rPublisher, err := ha.NewReliableProducer(simulEnvironment, streamName, producerOptions, handlePublishConfirms) @@ -315,9 +315,11 @@ func startPublisher(streamName string) error { } atomic.AddInt64(&messagesSent, int64(len(arr))) - err = prod.BatchSend(arr) + for _, streamMessage := range arr { + err = prod.Send(streamMessage) + checkErr(err) + } atomic.AddInt32(&publisherMessageCount, int32(len(arr))) - checkErr(err) } }(rPublisher, arr) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 82200ffc..9e7f6e45 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -6,6 +6,7 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "math/rand" "sync" "sync/atomic" "time" @@ -15,6 +16,7 @@ const ( StatusOpen = 1 StatusClosed = 2 StatusStreamDoesNotExist = 3 + StatusReconnecting = 4 ) func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishConfirm) { @@ -26,6 +28,26 @@ func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishCo }() } +func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { + go func() { + for event := range channelClose { + // TODO: Convert the string to a constant + if event.Reason == "socket client closed" { + logs.LogError("[RProducer] - producer closed unexpectedly.. Reconnecting..") + err, reconnected := p.retry() + if err != nil { + // TODO: Handle stream is not available + return + } + if reconnected { + p.setStatus(StatusOpen) + } + p.reconnectionSignal <- struct{}{} + } + } + }() +} + type ReliableProducer struct { env *stream.Environment producer *stream.Producer @@ -36,11 +58,12 @@ type ReliableProducer struct { mutex *sync.Mutex mutexStatus *sync.Mutex status int + reconnectionSignal chan struct{} } type ConfirmMessageHandler func(messageConfirm []*stream.ConfirmationStatus) -func NewHAProducer(env *stream.Environment, streamName string, +func NewReliableProducer(env *stream.Environment, streamName string, producerOptions *stream.ProducerOptions, confirmMessageHandler ConfirmMessageHandler) (*ReliableProducer, error) { res := &ReliableProducer{ @@ -52,6 +75,7 @@ func NewHAProducer(env *stream.Environment, streamName string, mutex: &sync.Mutex{}, mutexStatus: &sync.Mutex{}, confirmMessageHandler: confirmMessageHandler, + reconnectionSignal: make(chan struct{}), } if confirmMessageHandler == nil { return nil, fmt.Errorf("the confirmation message handler is mandatory") @@ -71,6 +95,8 @@ func (p *ReliableProducer) newProducer() error { return err } channelPublishConfirm := producer.NotifyPublishConfirmation() + channelNotifyClose := producer.NotifyClose() + p.handleNotifyClose(channelNotifyClose) p.handlePublishConfirm(channelPublishConfirm) p.producer = producer return err @@ -81,50 +107,23 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { return stream.StreamDoesNotExist } if p.getStatus() == StatusClosed { - return errors.New("Producer is closed") + return errors.New("producer is closed") } - p.mutex.Lock() - defer p.mutex.Unlock() - - errW := p.producer.Send(message) - - if errW != nil { - switch errW { - case stream.FrameTooLarge: - { - return stream.FrameTooLarge - } - default: - logs.LogError("[RProducer] - error during send %s", errW.Error()) - } + if p.getStatus() == StatusReconnecting { + logs.LogDebug("[RProducer] - producer is reconnecting") + <-p.reconnectionSignal + logs.LogDebug("[RProducer] - producer reconnected") } - if errW != nil { - err, done := p.retry() - if done { - return err - } - } - - return nil -} - -func (p *ReliableProducer) BatchSend(messages []message.StreamMessage) error { - if p.getStatus() == StatusStreamDoesNotExist { - return stream.StreamDoesNotExist - } - if p.getStatus() == StatusClosed { - return errors.New("Producer is closed") - } p.mutex.Lock() defer p.mutex.Unlock() - errW := p.producer.BatchSend(messages) + errW := p.producer.Send(message) if errW != nil { - switch errW { - case stream.FrameTooLarge: + switch { + case errors.Is(errW, stream.FrameTooLarge): { return stream.FrameTooLarge } @@ -134,18 +133,13 @@ func (p *ReliableProducer) BatchSend(messages []message.StreamMessage) error { } - if errW != nil { - err, done := p.retry() - if done { - return err - } - } - return nil } func (p *ReliableProducer) retry() (error, bool) { - time.Sleep(200 * time.Millisecond) + p.setStatus(StatusReconnecting) + sleepValue := rand.Intn(int(p.producerOptions.ConfirmationTimeOut.Seconds()-2+1) + 2) + time.Sleep(time.Duration(sleepValue) * time.Second) exists, errS := p.env.StreamExists(p.streamName) if errS != nil { return errS, true @@ -153,7 +147,6 @@ func (p *ReliableProducer) retry() (error, bool) { } if exists { logs.LogDebug("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName) - p.producer.FlushUnConfirmedMessages() return p.newProducer(), true } else { logs.LogError("[RProducer] - stream %s does not exist. Closing..", p.streamName) @@ -187,7 +180,6 @@ func (p *ReliableProducer) GetBroker() *stream.Broker { func (p *ReliableProducer) Close() error { p.setStatus(StatusClosed) - p.producer.FlushUnConfirmedMessages() err := p.producer.Close() if err != nil { return err diff --git a/pkg/stream/client.go b/pkg/stream/client.go index b4133240..7867b51a 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -515,6 +515,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) ( BatchPublishingDelay: options.BatchPublishingDelay, SubEntrySize: options.SubEntrySize, Compression: options.Compression, + ConfirmationTimeOut: options.ConfirmationTimeOut, }) if err != nil { @@ -523,6 +524,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) ( res := c.internalDeclarePublisher(streamName, producer) if res.Err == nil { producer.startPublishTask() + producer.startUnconfirmedMessagesTimeOutTask() } return producer, res.Err } diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 3057953b..b46724ec 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -75,6 +75,8 @@ const ( /// responses out of protocol closeChannel = uint16(60) connectionCloseError = uint16(61) + timeoutError = uint16(62) + /// defaultSocketCallTimeout = 10 * time.Second @@ -99,6 +101,7 @@ const ( defaultBatchSize = 100 defaultBatchPublishingDelay = 100 + defaultConfirmationTimeOut = 10 * time.Second // StreamTcpPort = "5552" @@ -121,6 +124,7 @@ var StreamNotAvailable = errors.New("Stream Not Available") var UnknownFrame = errors.New("Unknown Frame") var InternalError = errors.New("Internal Error") var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error") +var ConfirmationTimoutError = errors.New("Confirmation Timeout Error") var LeaderNotReady = errors.New("Leader not Ready yet") diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index d659a66d..8e24059d 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -102,11 +102,11 @@ func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error time.Sleep(200 * time.Millisecond) tentatives++ } - producer.FlushUnConfirmedMessages() if producer.closeHandler != nil { producer.closeHandler <- reason } + return coordinator.removeById(id, coordinator.producers) } diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 1fa39784..34bfe76d 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -109,6 +109,9 @@ func (env *Environment) DeclareStream(streamName string, options *StreamOptions) if err != nil { return } + if err != nil { + return + } }(client) if err != nil { return err diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index b0dbe495..7c219111 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -12,6 +12,7 @@ import ( ) type ConfirmationStatus struct { + inserted time.Time message message.StreamMessage producerID uint8 publishingId int64 @@ -80,12 +81,13 @@ type Producer struct { type ProducerOptions struct { client *Client streamName string - Name string // Producer name, it is useful to handle deduplication messages - QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server - BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput - BatchPublishingDelay int // Period to send a batch of messages. - SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id - Compression Compression // Compression type, it is valid only if SubEntrySize > 1 + Name string // Producer name, it is useful to handle deduplication messages + QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server + BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput + BatchPublishingDelay int // Period to send a batch of messages. + SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id + Compression Compression // Compression type, it is valid only if SubEntrySize > 1 + ConfirmationTimeOut time.Duration // Time to wait for the confirmation } func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions { @@ -118,6 +120,11 @@ func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOpti return po } +func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions { + po.ConfirmationTimeOut = duration + return po +} + func NewProducerOptions() *ProducerOptions { return &ProducerOptions{ QueueSize: defaultQueuePublisherSize, @@ -125,6 +132,7 @@ func NewProducerOptions() *ProducerOptions { BatchPublishingDelay: defaultBatchPublishingDelay, SubEntrySize: 1, Compression: Compression{}, + ConfirmationTimeOut: defaultConfirmationTimeOut, } } @@ -138,6 +146,7 @@ func (producer *Producer) addUnConfirmed(sequence int64, message message.StreamM producer.mutex.Lock() defer producer.mutex.Unlock() producer.unConfirmedMessages[sequence] = &ConfirmationStatus{ + inserted: time.Now(), message: message, producerID: producerID, publishingId: sequence, @@ -215,6 +224,32 @@ func (producer *Producer) sendBufferedMessages() { producer.pendingMessages.size = initBufferPublishSize } } + +func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { + + go func() { + for producer.getStatus() == open { + time.Sleep(2 * time.Second) + producer.mutex.Lock() + for _, msg := range producer.unConfirmedMessages { + if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut { + msg.err = ConfirmationTimoutError + msg.errorCode = timeoutError + msg.confirmed = false + if producer.publishConfirm != nil { + producer.publishConfirm <- []*ConfirmationStatus{msg} + } + delete(producer.unConfirmedMessages, msg.publishingId) + } + } + producer.mutex.Unlock() + } + time.Sleep(5 * time.Second) + producer.flushUnConfirmedMessages(timeoutError, ConfirmationTimoutError) + }() + +} + func (producer *Producer) startPublishTask() { go func(ch chan messageSequence) { var ticker = time.NewTicker(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond) @@ -225,7 +260,8 @@ func (producer *Producer) startPublishTask() { case msg, running := <-ch: { if !running { - producer.FlushUnConfirmedMessages() + + producer.flushUnConfirmedMessages(connectionCloseError, ConnectionClosed) if producer.publishConfirm != nil { close(producer.publishConfirm) producer.publishConfirm = nil @@ -465,25 +501,25 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequ if err != nil { // This sleep is need to wait the // 800 milliseconds to flush all the pending messages - producer.setStatus(closed) - producer.FlushUnConfirmedMessages() return err } return nil } -func (producer *Producer) FlushUnConfirmedMessages() { +func (producer *Producer) flushUnConfirmedMessages(errorCode uint16, err error) { producer.mutex.Lock() - if producer.publishConfirm != nil { - for _, msg := range producer.unConfirmedMessages { - msg.confirmed = false - msg.err = ConnectionClosed - msg.errorCode = connectionCloseError + + for _, msg := range producer.unConfirmedMessages { + msg.confirmed = false + msg.err = err + msg.errorCode = errorCode + if producer.publishConfirm != nil { producer.publishConfirm <- []*ConfirmationStatus{msg} - delete(producer.unConfirmedMessages, msg.publishingId) } + delete(producer.unConfirmedMessages, msg.publishingId) } + producer.mutex.Unlock() } @@ -521,7 +557,6 @@ func (producer *Producer) Close() error { } close(producer.messageSequenceCh) - return nil } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index f9188e1d..2c811903 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -138,7 +138,11 @@ var _ = Describe("Streaming Producers", func() { BeforeEach(func() { producer = createProducer( - NewProducerOptions().SetBatchSize(BatchSize).SetSubEntrySize(SubEntrySize).SetCompression(Compression{}.None()), + NewProducerOptions(). + SetBatchSize(BatchSize). + SetSubEntrySize(SubEntrySize). + SetConfirmationTimeOut(1*time.Second). + SetCompression(Compression{}.None()), &messagesReceived, testEnvironment, testProducerStream, @@ -229,7 +233,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(14)), "confirm should receive same messages send by producer") - Expect(len(producer.unConfirmedMessages)).To(Equal(0)) + Expect(producer.lenUnConfirmed()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -708,7 +712,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(33).SetCompression(Compression{}.Gzip())) Expect(err).NotTo(HaveOccurred()) testCompress(producerGZIP) - Expect(len(producerGZIP.unConfirmedMessages)).To(Equal(0)) + Expect(producerGZIP.lenUnConfirmed()).To(Equal(0)) Expect(producerGZIP.Close()).NotTo(HaveOccurred()) producerLz4, err := testEnvironment.NewProducer(testProducerStream, @@ -716,7 +720,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(55).SetCompression(Compression{}.Lz4())) Expect(err).NotTo(HaveOccurred()) testCompress(producerLz4) - Expect(len(producerLz4.unConfirmedMessages)).To(Equal(0)) + Expect(producerLz4.lenUnConfirmed()).To(Equal(0)) Expect(producerLz4.Close()).NotTo(HaveOccurred()) producerSnappy, err := testEnvironment.NewProducer(testProducerStream, @@ -724,7 +728,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(666).SetCompression(Compression{}.Snappy())) Expect(err).NotTo(HaveOccurred()) testCompress(producerSnappy) - Expect(len(producerSnappy.unConfirmedMessages)).To(Equal(0)) + Expect(producerSnappy.lenUnConfirmed()).To(Equal(0)) Expect(producerSnappy.Close()).NotTo(HaveOccurred()) producerZstd, err := testEnvironment.NewProducer(testProducerStream, @@ -732,7 +736,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(98).SetCompression(Compression{}.Zstd())) Expect(err).NotTo(HaveOccurred()) testCompress(producerZstd) - Expect(len(producerZstd.unConfirmedMessages)).To(Equal(0)) + Expect(producerZstd.lenUnConfirmed()).To(Equal(0)) Expect(producerZstd.Close()).NotTo(HaveOccurred()) }) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 807707f9..b4d2eff3 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -415,9 +415,8 @@ func (c *Client) decodeMessage(r *bufio.Reader, filter bool, offset int64, offse func (c *Client) creditNotificationFrameHandler(readProtocol *ReaderProtocol, r *bufio.Reader) { readProtocol.ResponseCode = uShortExtractResponseCode(readUShort(r)) - //subscriptionId := readByte(r) - _ = readByte(r) - // TODO ASK WHAT TO DO HERE + subscriptionId := readByte(r) + logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId) } func (c *Client) queryOffsetFrameHandler(readProtocol *ReaderProtocol,