diff --git a/compose/ha_tls/docker-compose.yml b/compose/ha_tls/docker-compose.yml index ecd5f2fc..4f719f11 100644 --- a/compose/ha_tls/docker-compose.yml +++ b/compose/ha_tls/docker-compose.yml @@ -7,7 +7,7 @@ services: networks: - back hostname: node0 - image: rabbitmq:3.13-rc-management + image: rabbitmq:3.13-management pull_policy: always ports: - "5561:5551" @@ -24,7 +24,7 @@ services: networks: - back hostname: node1 - image: rabbitmq:3.13-rc-management + image: rabbitmq:3.13-management pull_policy: always ports: - "5571:5551" @@ -41,7 +41,7 @@ services: networks: - back hostname: node2 - image: rabbitmq:3.13-rc-management + image: rabbitmq:3.13-management pull_policy: always ports: - "5581:5551" diff --git a/examples/haProducer/producer.go b/examples/haProducer/producer.go index c4d092d0..30c825a2 100644 --- a/examples/haProducer/producer.go +++ b/examples/haProducer/producer.go @@ -26,39 +26,23 @@ func CheckErr(err error) { var confirmed int32 = 0 var fail int32 = 0 -var mutex = sync.Mutex{} -var unConfirmedMessages []message.StreamMessage - -func handlePublishConfirm(messageStatus []*stream.ConfirmationStatus) { - go func() { - for _, msgStatus := range messageStatus { - if msgStatus.IsConfirmed() { - atomic.AddInt32(&confirmed, 1) - } else { - atomic.AddInt32(&fail, 1) - mutex.Lock() - unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) - mutex.Unlock() - } - - } - }() -} func main() { reader := bufio.NewReader(os.Stdin) fmt.Println("HA producer example") fmt.Println("Connecting to RabbitMQ streaming ...") - const messagesToSend = 20_000_000 + const messagesToSend = 500_000 + const numberOfProducers = 7 addresses := []string{ - "rabbitmq-stream://guest:guest@localhost:5552/%2f", - "rabbitmq-stream://guest:guest@localhost:5552/%2f", + //"rabbitmq-stream://guest:guest@node1:5572/%2f", + //"rabbitmq-stream://guest:guest@node1:5572/%2f", "rabbitmq-stream://guest:guest@localhost:5552/%2f"} env, err := stream.NewEnvironment( stream.NewEnvironmentOptions(). + SetMaxProducersPerClient(4). SetUris(addresses)) CheckErr(err) @@ -70,42 +54,70 @@ func main() { MaxLengthBytes: stream.ByteCapacity{}.GB(2), }, ) - - rProducer, err := ha.NewReliableProducer(env, - streamName, - stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second), handlePublishConfirm) - CheckErr(err) var sent int32 isRunning := true go func() { for isRunning { totalHandled := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) fmt.Printf("%s - ToSend: %d - Sent:%d - Confirmed:%d - Not confirmed:%d - Total :%d \n", - time.Now().Format(time.RFC822), messagesToSend, sent, confirmed, fail, totalHandled) + time.Now().Format(time.RFC822), messagesToSend*numberOfProducers, sent, confirmed, fail, totalHandled) time.Sleep(5 * time.Second) } }() + var producers []*ha.ReliableProducer + for i := 0; i < numberOfProducers; i++ { + var mutex = sync.Mutex{} + var unConfirmedMessages []message.StreamMessage + rProducer, err := ha.NewReliableProducer(env, + streamName, + stream.NewProducerOptions().SetConfirmationTimeOut(5*time.Second), + func(messageStatus []*stream.ConfirmationStatus) { + go func() { + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + atomic.AddInt32(&confirmed, 1) + } else { + atomic.AddInt32(&fail, 1) + mutex.Lock() + unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage()) + mutex.Unlock() + } - for i := 0; i < messagesToSend; i++ { - msg := amqp.NewMessage([]byte("ha")) - mutex.Lock() - for _, confirmedMessage := range unConfirmedMessages { - err := rProducer.Send(confirmedMessage) - atomic.AddInt32(&sent, 1) - CheckErr(err) - } - unConfirmedMessages = []message.StreamMessage{} - mutex.Unlock() - err := rProducer.Send(msg) - atomic.AddInt32(&sent, 1) + } + }() + }) CheckErr(err) + producers = append(producers, rProducer) + + go func() { + for i := 0; i < messagesToSend; i++ { + msg := amqp.NewMessage([]byte("ha")) + mutex.Lock() + for _, confirmedMessage := range unConfirmedMessages { + err := rProducer.Send(confirmedMessage) + atomic.AddInt32(&sent, 1) + CheckErr(err) + } + unConfirmedMessages = []message.StreamMessage{} + mutex.Unlock() + err := rProducer.Send(msg) + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&sent, 1) + CheckErr(err) + } + }() + } fmt.Println("Terminated. Press enter to close the connections.") _, _ = reader.ReadString('\n') + for _, producer := range producers { + err := producer.Close() + if err != nil { + CheckErr(err) + } + } isRunning = false - err = rProducer.Close() - CheckErr(err) err = env.Close() CheckErr(err) } diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 9e7f6e45..bc3b27d8 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -1,8 +1,8 @@ package ha import ( + "errors" "fmt" - "github.com/pkg/errors" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" @@ -34,7 +34,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { // TODO: Convert the string to a constant if event.Reason == "socket client closed" { logs.LogError("[RProducer] - producer closed unexpectedly.. Reconnecting..") - err, reconnected := p.retry() + err, reconnected := p.retry(1) if err != nil { // TODO: Handle stream is not available return @@ -111,9 +111,9 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { } if p.getStatus() == StatusReconnecting { - logs.LogDebug("[RProducer] - producer is reconnecting") + logs.LogDebug("[RProducer] - send producer is reconnecting") <-p.reconnectionSignal - logs.LogDebug("[RProducer] - producer reconnected") + logs.LogDebug("[RProducer] - send producer reconnected") } p.mutex.Lock() @@ -128,6 +128,7 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { return stream.FrameTooLarge } default: + time.Sleep(500 * time.Millisecond) logs.LogError("[RProducer] - error during send %s", errW.Error()) } @@ -136,22 +137,41 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error { return nil } -func (p *ReliableProducer) retry() (error, bool) { +func (p *ReliableProducer) retry(backoff int) (error, bool) { p.setStatus(StatusReconnecting) - sleepValue := rand.Intn(int(p.producerOptions.ConfirmationTimeOut.Seconds()-2+1) + 2) - time.Sleep(time.Duration(sleepValue) * time.Second) - exists, errS := p.env.StreamExists(p.streamName) - if errS != nil { + sleepValue := rand.Intn(int((p.producerOptions.ConfirmationTimeOut.Seconds()-2+1)+2)*1000) + backoff*1000 + logs.LogInfo("[RProducer] - The producer for the stream %s is in reconnection in %d milliseconds", p.streamName, sleepValue) + time.Sleep(time.Duration(sleepValue) * time.Millisecond) + streamMetaData, errS := p.env.StreamMetaData(p.streamName) + if errors.Is(errS, stream.StreamDoesNotExist) { return errS, true - } - if exists { - logs.LogDebug("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName) - return p.newProducer(), true + if errors.Is(errS, stream.StreamNotAvailable) { + logs.LogInfo("[RProducer] - stream %s is not available. Trying to reconnect", p.streamName) + return p.retry(backoff + 1) + } + if streamMetaData.Leader == nil { + logs.LogInfo("[RProducer] - The leader for the stream %s is not ready. Trying to reconnect") + return p.retry(backoff + 1) + } + + var result error + if streamMetaData != nil { + logs.LogInfo("[RProducer] - stream %s exists. Reconnecting the producer.", p.streamName) + result = p.newProducer() + if result == nil { + logs.LogInfo("[RProducer] - stream %s exists. Producer reconnected.", p.streamName) + } else { + logs.LogInfo("[RProducer] - error creating producer for the stream %s exists. Trying to reconnect", p.streamName) + return p.retry(backoff + 1) + } } else { logs.LogError("[RProducer] - stream %s does not exist. Closing..", p.streamName) - return stream.StreamDoesNotExist, true + result = stream.StreamDoesNotExist } + + return result, true + } func (p *ReliableProducer) IsOpen() bool { diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go new file mode 100644 index 00000000..4d7fefe3 --- /dev/null +++ b/pkg/ha/ha_publisher_test.go @@ -0,0 +1,87 @@ +package ha + +import ( + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + . "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "sync/atomic" +) + +var _ = Describe("Reliable Producer", func() { + + var ( + envForRProducer *Environment + streamForRProducer string + ) + BeforeEach(func() { + testEnv, err := NewEnvironment(nil) + envForRProducer = testEnv + Expect(err).NotTo(HaveOccurred()) + streamForRProducer = uuid.New().String() + err = envForRProducer.DeclareStream(streamForRProducer, nil) + Expect(err).NotTo(HaveOccurred()) + }) + AfterEach(func() { + Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred()) + }) + + It("Validate confirm handler", func() { + _, err := NewReliableProducer(envForRProducer, + streamForRProducer, &ProducerOptions{}, nil) + Expect(err).To(HaveOccurred()) + }) + + It("Create/Confirm and close a Reliable Producer", func() { + signal := make(chan struct{}) + var confirmed int32 + producer, err := NewReliableProducer(envForRProducer, + streamForRProducer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) { + for _, confirm := range messageConfirm { + Expect(confirm.IsConfirmed()).To(BeTrue()) + } + if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 { + signal <- struct{}{} + } + }) + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 10; i++ { + msg := amqp.NewMessage([]byte("ha")) + err := producer.Send(msg) + Expect(err).NotTo(HaveOccurred()) + } + <-signal + Expect(producer.Close()).NotTo(HaveOccurred()) + }) + + //TODO: The test is commented out because it is not possible to kill the connection from the client side + // the client provider name is not exposed to the user. + // we need to expose it than kill the connection + + //It("restart Reliable Producer in case of killing connection", func() { + // signal := make(chan struct{}) + // var confirmed int32 + // producer, err := NewReliableProducer(envForRProducer, + // streamForRProducer, &ProducerOptions{}, func(messageConfirm []*ConfirmationStatus) { + // for _, confirm := range messageConfirm { + // Expect(confirm.IsConfirmed()).To(BeTrue()) + // } + // if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 { + // signal <- struct{}{} + // } + // }) + // Expect(err).NotTo(HaveOccurred()) + // + // // kill the connection + // + // for i := 0; i < 10; i++ { + // msg := amqp.NewMessage([]byte("ha")) + // err := producer.Send(msg) + // Expect(err).NotTo(HaveOccurred()) + // } + // <-signal + // Expect(producer.Close()).NotTo(HaveOccurred()) + //}) + +}) diff --git a/pkg/ha/ha_suite_test.go b/pkg/ha/ha_suite_test.go new file mode 100644 index 00000000..6e76f433 --- /dev/null +++ b/pkg/ha/ha_suite_test.go @@ -0,0 +1,13 @@ +package ha_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestHa(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Ha Suite") +} diff --git a/pkg/ha/http_utils_test.go b/pkg/ha/http_utils_test.go new file mode 100644 index 00000000..abf621e1 --- /dev/null +++ b/pkg/ha/http_utils_test.go @@ -0,0 +1,89 @@ +package ha + +import ( + "encoding/json" + "github.com/pkg/errors" + "io/ioutil" + "net/http" + "strconv" +) + +type queue struct { + Messages int `json:"messages"` +} + +type connection struct { + Name string `json:"name"` +} + +func messagesReady(queueName string, port string) (int, error) { + bodyString, err := httpGet("http://localhost:"+port+"/api/queues/%2F/"+queueName, "guest", "guest") + if err != nil { + return 0, err + } + + var data queue + err = json.Unmarshal([]byte(bodyString), &data) + if err != nil { + return 0, err + } + return data.Messages, nil +} + +func Connections(port string) ([]connection, error) { + bodyString, err := httpGet("http://localhost:"+port+"/api/connections/", "guest", "guest") + if err != nil { + return nil, err + } + + var data []connection + err = json.Unmarshal([]byte(bodyString), &data) + if err != nil { + return nil, err + } + return data, nil +} + +func DropConnection(name string, port string) error { + _, err := httpDelete("http://localhost:"+port+"/api/connections/"+name, "guest", "guest") + if err != nil { + return err + } + + return nil +} +func httpGet(url, username, password string) (string, error) { + return baseCall(url, username, password, "GET") +} + +func httpDelete(url, username, password string) (string, error) { + return baseCall(url, username, password, "DELETE") +} + +func baseCall(url, username, password string, method string) (string, error) { + var client http.Client + req, err := http.NewRequest(method, url, nil) + if err != nil { + return "", err + } + req.SetBasicAuth(username, password) + + resp, err3 := client.Do(req) + + if err3 != nil { + return "", err3 + } + + defer resp.Body.Close() + + if resp.StatusCode == 200 { // OK + bodyBytes, err2 := ioutil.ReadAll(resp.Body) + if err2 != nil { + return "", err2 + } + return string(bodyBytes), nil + + } + return "", errors.New(strconv.Itoa(resp.StatusCode)) + +} diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 7867b51a..4a24af65 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -143,7 +143,11 @@ func (c *Client) connect() error { c.tuneState.requestedHeartbeat = int(c.tcpParameters.RequestedHeartbeat.Seconds()) servAddr := net.JoinHostPort(host, port) - tcpAddr, _ := net.ResolveTCPAddr("tcp", servAddr) + tcpAddr, errorResolve := net.ResolveTCPAddr("tcp", servAddr) + if errorResolve != nil { + logs.LogDebug("Resolve error %s", errorResolve) + return errorResolve + } connection, errorConnection := net.DialTCP("tcp", nil, tcpAddr) if errorConnection != nil { logs.LogDebug("%s", errorConnection) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 34bfe76d..a8044eaf 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -87,9 +87,12 @@ func (env *Environment) newReconnectClient() (*Client, error) { err := client.connect() tentatives := 1 for err != nil { - logs.LogError("Can't connect the locator client, error:%s, retry in %d seconds, broker: ", err, tentatives, + sleepTime := rand.Intn(5000) + (tentatives * 1000) + + logs.LogError("Can't connect the locator client, error:%s, retry in %d milliseconds, broker: ", err, sleepTime, client.broker) - time.Sleep(time.Duration(tentatives) * time.Second) + + time.Sleep(time.Duration(sleepTime) * time.Millisecond) rand.Seed(time.Now().UnixNano()) n := rand.Intn(len(env.options.ConnectionParameters)) client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters, diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 6afba83d..e4144fc9 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -225,7 +225,6 @@ var _ = Describe("Environment test", func() { }) Describe("Environment Validations", func() { - _, err := NewEnvironment(NewEnvironmentOptions(). SetMaxConsumersPerClient(0). SetMaxProducersPerClient(0)) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 7c219111..dd384308 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -291,7 +291,6 @@ func (producer *Producer) startPublishTask() { producer.sendBufferedMessages() producer.mutexPending.Unlock() } - } }(producer.messageSequenceCh) @@ -317,6 +316,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { publishingId: sequence, } } else { + // TODO: Change the error message with a typed error return fmt.Errorf("producer id: %d closed", producer.id) }