Skip to content

Commit

Permalink
Improve the ReliableProducer (#267)
Browse files Browse the repository at this point in the history
* Improve the ReliableProducer
* closes: #266
* part of: #265
* change the flushPendingMessages with more info
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Feb 26, 2024
1 parent da27e0b commit 3db652a
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 153 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ rabbitmq-ha-proxy:
mv compose/ha_tls/tls-gen/basic/result/server_*key.pem compose/ha_tls/tls-gen/basic/result/server_key.pem
cd compose/ha_tls; docker build -t haproxy-rabbitmq-cluster .
cd compose/ha_tls; docker-compose down
cd compose/ha_tls; docker-compose up -d
cd compose/ha_tls; docker-compose up

rabbitmq-server-tls:
cd compose/tls; rm -rf tls-gen;
Expand Down
13 changes: 10 additions & 3 deletions compose/ha_tls/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ services:
networks:
- back
hostname: node0
image: docker.io/rabbitmq:3.13-rc-management
image: rabbitmq:3.13-rc-management
pull_policy: always
ports:
- "5561:5551"
- "5562:5552"
- "5682:5672"
tty: true
volumes:
- ./conf/:/etc/rabbitmq/
Expand All @@ -22,10 +24,12 @@ services:
networks:
- back
hostname: node1
image: docker.io/rabbitmq:3.13-rc-management
image: rabbitmq:3.13-rc-management
pull_policy: always
ports:
- "5571:5551"
- "5572:5552"
- "5692:5672"
tty: true
volumes:
- ./conf/:/etc/rabbitmq/
Expand All @@ -37,10 +41,12 @@ services:
networks:
- back
hostname: node2
image: docker.io/rabbitmq:3.13-rc-management
image: rabbitmq:3.13-rc-management
pull_policy: always
ports:
- "5581:5551"
- "5582:5552"
- "5602:5672"
tty: true
volumes:
- ./conf/:/etc/rabbitmq/
Expand All @@ -52,6 +58,7 @@ services:
ports:
- "5553:5552"
- "5554:5551"
- "5674:5672"
- "15673:15672"
networks:
- back
Expand Down
7 changes: 7 additions & 0 deletions compose/ha_tls/haproxy.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ listen rabbitmq-stream
server rabbit_node1 rabbit_node1:5552 check inter 5000 fall 3
server rabbit_node2 rabbit_node2:5552 check inter 5000 fall 3

listen rabbitmq-amqp
bind 0.0.0.0:5672
balance roundrobin
server rabbit_node0 rabbit_node0:5672 check inter 5000 fall 3
server rabbit_node1 rabbit_node1:5672 check inter 5000 fall 3
server rabbit_node2 rabbit_node2:5672 check inter 5000 fall 3


listen rabbitmq-ui
bind 0.0.0.0:15672
Expand Down
112 changes: 39 additions & 73 deletions examples/haProducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package main
import (
"bufio"
"fmt"
"github.com/google/uuid"
"github.com/rabbitmq/rabbitmq-stream-go-client/examples/haProducer/http"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
Expand All @@ -26,21 +24,21 @@ func CheckErr(err error) {
}
}

var counter int32 = 0
var confirmed int32 = 0
var fail int32 = 0
var mutex = sync.Mutex{}
var unConfirmedMessages []message.StreamMessage

func handlePublishConfirm(messageStatus []*stream.ConfirmationStatus) {
go func() {
for _, message := range messageStatus {
if message.IsConfirmed() {

if atomic.AddInt32(&counter, 1)%20000 == 0 {
fmt.Printf("Confirmed %d messages\n", atomic.LoadInt32(&counter))
}
for _, msgStatus := range messageStatus {
if msgStatus.IsConfirmed() {
atomic.AddInt32(&confirmed, 1)
} else {
if atomic.AddInt32(&fail, 1)%20000 == 0 {
fmt.Printf("NOT Confirmed %d messages\n", atomic.LoadInt32(&fail))
}
atomic.AddInt32(&fail, 1)
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
}

}
Expand All @@ -52,6 +50,7 @@ func main() {

fmt.Println("HA producer example")
fmt.Println("Connecting to RabbitMQ streaming ...")
const messagesToSend = 20_000_000

addresses := []string{
"rabbitmq-stream://guest:guest@localhost:5552/%2f",
Expand All @@ -63,83 +62,50 @@ func main() {
SetUris(addresses))
CheckErr(err)

streamName := uuid.New().String()
streamName := "golang-reliable-producer-Test"
env.DeleteStream(streamName)

err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

rProducer, err := ha.NewHAProducer(env, streamName, nil, handlePublishConfirm)
CheckErr(err)
rProducer1, err := ha.NewHAProducer(env, streamName, nil, handlePublishConfirm)
rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second), handlePublishConfirm)
CheckErr(err)

wg := sync.WaitGroup{}

var sent int32
for i := 0; i < 10; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
for i := 0; i < 100000; i++ {
msg := amqp.NewMessage([]byte("ha"))
err := rProducer.Send(msg)
CheckErr(err)
err = rProducer1.BatchSend([]message.StreamMessage{msg})
if atomic.AddInt32(&sent, 2)%20000 == 0 {
time.Sleep(100 * time.Millisecond)
fmt.Printf("Sent..%d messages\n", atomic.LoadInt32(&sent))
}
if err != nil {
break
}
}
wg.Done()
}(&wg)
}
isActive := true
isRunning := true
go func() {
for isActive {
coo, err := http.Connections("15672")
if err != nil {
return
}

for _, connection := range coo {
_ = http.DropConnection(connection.Name, "15672")
}
time.Sleep(2 * time.Second)
for isRunning {
totalHandled := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail)
fmt.Printf("%s - ToSend: %d - Sent:%d - Confirmed:%d - Not confirmed:%d - Total :%d \n",
time.Now().Format(time.RFC822), messagesToSend, sent, confirmed, fail, totalHandled)
time.Sleep(5 * time.Second)
}
}()

wg.Wait()
isActive = false
time.Sleep(2 * time.Second)

fmt.Println("Terminated. Press any key to see the report. ")
_, _ = reader.ReadString('\n')
time.Sleep(200 * time.Millisecond)
totalHandled := atomic.LoadInt32(&counter) + atomic.LoadInt32(&fail)
fmt.Printf("[Report]\n - Sent:%d \n - Confirmed:%d\n - Not confirmed:%d\n - Total messages handeld:%d \n",
sent, counter, fail, totalHandled)
if sent == totalHandled {
fmt.Printf(" - Messages sent %d match with handled: %d! yea! \n\n", sent, totalHandled)
}

if totalHandled > sent {
fmt.Printf(" - Messages sent %d are lower than handled: %d! some duplication, can happens ! \n\n", sent, totalHandled)
}

if sent > totalHandled {
fmt.Printf(" - Messages handled %d are lower than send: %d! that's not good!\n\n", totalHandled, sent)
for i := 0; i < messagesToSend; i++ {
msg := amqp.NewMessage([]byte("ha"))
mutex.Lock()
for _, confirmedMessage := range unConfirmedMessages {
err := rProducer.Send(confirmedMessage)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}
unConfirmedMessages = []message.StreamMessage{}
mutex.Unlock()
err := rProducer.Send(msg)
atomic.AddInt32(&sent, 1)
CheckErr(err)
}

fmt.Println("Terminated. Press enter to close the connections.")
_, _ = reader.ReadString('\n')
isRunning = false
err = rProducer.Close()
CheckErr(err)
err = rProducer1.Close()
CheckErr(err)
err = env.DeleteStream(streamName)
CheckErr(err)
err = env.Close()
CheckErr(err)
}
1 change: 1 addition & 0 deletions generate/generate_amqp10_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {

msg := amqp.NewMessage([]byte(""))
binary, err := msg.MarshalBinary()
//msg.UnmarshalBinary()
if err != nil {
return
}
Expand Down
8 changes: 5 additions & 3 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func startPublisher(streamName string) error {
logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp)
}

rPublisher, err := ha.NewHAProducer(simulEnvironment,
rPublisher, err := ha.NewReliableProducer(simulEnvironment,
streamName,
producerOptions,
handlePublishConfirms)
Expand Down Expand Up @@ -315,9 +315,11 @@ func startPublisher(streamName string) error {
}

atomic.AddInt64(&messagesSent, int64(len(arr)))
err = prod.BatchSend(arr)
for _, streamMessage := range arr {
err = prod.Send(streamMessage)
checkErr(err)
}
atomic.AddInt32(&publisherMessageCount, int32(len(arr)))
checkErr(err)

}
}(rPublisher, arr)
Expand Down
Loading

0 comments on commit 3db652a

Please sign in to comment.