Skip to content

Commit

Permalink
Improve the ReliableProducer during the broker restart (#268)
Browse files Browse the repository at this point in the history
* Closes #265

* add random sleep during the producer restart
* check the stream status during the restart
* add tests for the reliable producer
---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Feb 27, 2024
1 parent 9e48184 commit a503d8a
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 63 deletions.
6 changes: 3 additions & 3 deletions compose/ha_tls/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
networks:
- back
hostname: node0
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
pull_policy: always
ports:
- "5561:5551"
Expand All @@ -24,7 +24,7 @@ services:
networks:
- back
hostname: node1
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
pull_policy: always
ports:
- "5571:5551"
Expand All @@ -41,7 +41,7 @@ services:
networks:
- back
hostname: node2
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
pull_policy: always
ports:
- "5581:5551"
Expand Down
94 changes: 53 additions & 41 deletions examples/haProducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,23 @@ func CheckErr(err error) {

var confirmed int32 = 0
var fail int32 = 0
var mutex = sync.Mutex{}
var unConfirmedMessages []message.StreamMessage

func handlePublishConfirm(messageStatus []*stream.ConfirmationStatus) {
go func() {
for _, msgStatus := range messageStatus {
if msgStatus.IsConfirmed() {
atomic.AddInt32(&confirmed, 1)
} else {
atomic.AddInt32(&fail, 1)
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
}

}
}()
}

func main() {
reader := bufio.NewReader(os.Stdin)

fmt.Println("HA producer example")
fmt.Println("Connecting to RabbitMQ streaming ...")
const messagesToSend = 20_000_000
const messagesToSend = 500_000
const numberOfProducers = 7

addresses := []string{
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
//"rabbitmq-stream://guest:guest@node1:5572/%2f",
//"rabbitmq-stream://guest:guest@node1:5572/%2f",
"rabbitmq-stream://guest:guest@localhost:5552/%2f"}

env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetMaxProducersPerClient(4).
SetUris(addresses))
CheckErr(err)

Expand All @@ -70,42 +54,70 @@ func main() {
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second), handlePublishConfirm)
CheckErr(err)
var sent int32
isRunning := true
go func() {
for isRunning {
totalHandled := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail)
fmt.Printf("%s - ToSend: %d - Sent:%d - Confirmed:%d - Not confirmed:%d - Total :%d \n",
time.Now().Format(time.RFC822), messagesToSend, sent, confirmed, fail, totalHandled)
time.Now().Format(time.RFC822), messagesToSend*numberOfProducers, sent, confirmed, fail, totalHandled)
time.Sleep(5 * time.Second)
}
}()
var producers []*ha.ReliableProducer
for i := 0; i < numberOfProducers; i++ {
var mutex = sync.Mutex{}
var unConfirmedMessages []message.StreamMessage
rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second),
func(messageStatus []*stream.ConfirmationStatus) {
go func() {
for _, msgStatus := range messageStatus {
if msgStatus.IsConfirmed() {
atomic.AddInt32(&confirmed, 1)
} else {
atomic.AddInt32(&fail, 1)
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
}

for i := 0; i < messagesToSend; i++ {
msg := amqp.NewMessage([]byte("ha"))
mutex.Lock()
for _, confirmedMessage := range unConfirmedMessages {
err := rProducer.Send(confirmedMessage)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
unConfirmedMessages = []message.StreamMessage{}
mutex.Unlock()
err := rProducer.Send(msg)
atomic.AddInt32(&sent, 1)
}
}()
})
CheckErr(err)
producers = append(producers, rProducer)

go func() {
for i := 0; i < messagesToSend; i++ {
msg := amqp.NewMessage([]byte("ha"))
mutex.Lock()
for _, confirmedMessage := range unConfirmedMessages {
err := rProducer.Send(confirmedMessage)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
unConfirmedMessages = []message.StreamMessage{}
mutex.Unlock()
err := rProducer.Send(msg)
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
}()

}

fmt.Println("Terminated. Press enter to close the connections.")
_, _ = reader.ReadString('\n')
for _, producer := range producers {
err := producer.Close()
if err != nil {
CheckErr(err)
}
}
isRunning = false
err = rProducer.Close()
CheckErr(err)
err = env.Close()
CheckErr(err)
}
48 changes: 34 additions & 14 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ha

import (
"errors"
"fmt"
"github.com/pkg/errors"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
Expand Down Expand Up @@ -34,7 +34,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
// TODO: Convert the string to a constant
if event.Reason == "socket client closed" {
logs.LogError("[RProducer] - producer closed unexpectedly.. Reconnecting..")
err, reconnected := p.retry()
err, reconnected := p.retry(1)
if err != nil {
// TODO: Handle stream is not available
return
Expand Down Expand Up @@ -111,9 +111,9 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
}

if p.getStatus() == StatusReconnecting {
logs.LogDebug("[RProducer] - producer is reconnecting")
logs.LogDebug("[RProducer] - send producer is reconnecting")
<-p.reconnectionSignal
logs.LogDebug("[RProducer] - producer reconnected")
logs.LogDebug("[RProducer] - send producer reconnected")
}

p.mutex.Lock()
Expand All @@ -128,6 +128,7 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
return stream.FrameTooLarge
}
default:
time.Sleep(500 * time.Millisecond)
logs.LogError("[RProducer] - error during send %s", errW.Error())
}

Expand All @@ -136,22 +137,41 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
return nil
}

func (p *ReliableProducer) retry() (error, bool) {
func (p *ReliableProducer) retry(backoff int) (error, bool) {
p.setStatus(StatusReconnecting)
sleepValue := rand.Intn(int(p.producerOptions.ConfirmationTimeOut.Seconds()-2+1) + 2)
time.Sleep(time.Duration(sleepValue) * time.Second)
exists, errS := p.env.StreamExists(p.streamName)
if errS != nil {
sleepValue := rand.Intn(int((p.producerOptions.ConfirmationTimeOut.Seconds()-2+1)+2)*1000) + backoff*1000
logs.LogInfo("[RProducer] - The producer for the stream %s is in reconnection in %d milliseconds", p.streamName, sleepValue)
time.Sleep(time.Duration(sleepValue) * time.Millisecond)
streamMetaData, errS := p.env.StreamMetaData(p.streamName)
if errors.Is(errS, stream.StreamDoesNotExist) {
return errS, true

}
if exists {
logs.LogDebug("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName)
return p.newProducer(), true
if errors.Is(errS, stream.StreamNotAvailable) {
logs.LogInfo("[RProducer] - stream %s is not available. Trying to reconnect", p.streamName)
return p.retry(backoff + 1)
}
if streamMetaData.Leader == nil {
logs.LogInfo("[RProducer] - The leader for the stream %s is not ready. Trying to reconnect")
return p.retry(backoff + 1)
}

var result error
if streamMetaData != nil {
logs.LogInfo("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName)
result = p.newProducer()
if result == nil {
logs.LogInfo("[RProducer] - stream %s exists. Producer reconnected.", p.streamName)
} else {
logs.LogInfo("[RProducer] - error creating producer for the stream %s exists. Trying to reconnect", p.streamName)
return p.retry(backoff + 1)
}
} else {
logs.LogError("[RProducer] - stream %s does not exist. Closing..", p.streamName)
return stream.StreamDoesNotExist, true
result = stream.StreamDoesNotExist
}

return result, true

}

func (p *ReliableProducer) IsOpen() bool {
Expand Down
87 changes: 87 additions & 0 deletions pkg/ha/ha_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package ha

import (
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
. "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"sync/atomic"
)

var _ = Describe("Reliable Producer", func() {

var (
envForRProducer *Environment
streamForRProducer string
)
BeforeEach(func() {
testEnv, err := NewEnvironment(nil)
envForRProducer = testEnv
Expect(err).NotTo(HaveOccurred())
streamForRProducer = uuid.New().String()
err = envForRProducer.DeclareStream(streamForRProducer, nil)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred())
})

It("Validate confirm handler", func() {
_, err := NewReliableProducer(envForRProducer,
streamForRProducer, &ProducerOptions{}, nil)
Expect(err).To(HaveOccurred())
})

It("Create/Confirm and close a Reliable Producer", func() {
signal := make(chan struct{})
var confirmed int32
producer, err := NewReliableProducer(envForRProducer,
streamForRProducer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) {
for _, confirm := range messageConfirm {
Expect(confirm.IsConfirmed()).To(BeTrue())
}
if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
signal <- struct{}{}
}
})
Expect(err).NotTo(HaveOccurred())
for i := 0; i < 10; i++ {
msg := amqp.NewMessage([]byte("ha"))
err := producer.Send(msg)
Expect(err).NotTo(HaveOccurred())
}
<-signal
Expect(producer.Close()).NotTo(HaveOccurred())
})

//TODO: The test is commented out because it is not possible to kill the connection from the client side
// the client provider name is not exposed to the user.
// we need to expose it than kill the connection

//It("restart Reliable Producer in case of killing connection", func() {
// signal := make(chan struct{})
// var confirmed int32
// producer, err := NewReliableProducer(envForRProducer,
// streamForRProducer, &ProducerOptions{}, func(messageConfirm []*ConfirmationStatus) {
// for _, confirm := range messageConfirm {
// Expect(confirm.IsConfirmed()).To(BeTrue())
// }
// if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
// signal <- struct{}{}
// }
// })
// Expect(err).NotTo(HaveOccurred())
//
// // kill the connection
//
// for i := 0; i < 10; i++ {
// msg := amqp.NewMessage([]byte("ha"))
// err := producer.Send(msg)
// Expect(err).NotTo(HaveOccurred())
// }
// <-signal
// Expect(producer.Close()).NotTo(HaveOccurred())
//})

})
13 changes: 13 additions & 0 deletions pkg/ha/ha_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ha_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestHa(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Ha Suite")
}
Loading

0 comments on commit a503d8a

Please sign in to comment.