Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update test to bytes #39

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions helpers/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func ToBytes(payload any) []byte {
// Marshal the payload to JSON.
bytesPayload, err := json.Marshal(&payload)
if err != nil {
logs.Error("Failed to marshal JSON: %s", err.Error())
return nil
}

Expand Down
80 changes: 80 additions & 0 deletions helpers/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package helpers

import (
"reflect"
"testing"
)

func TestToBytes(t *testing.T) {
type args struct {
payload any
}
tests := []struct {
name string
args args
want []byte
}{
{
name: "test 1",
args: args{
payload: map[string]string{
"hello": "world",
},
},
want: []byte(`{"hello":"world"}`),
},
{
name: "test 2",
args: args{
payload: 1,
},
want: []byte(`1`),
},
{
name: "test 2",
args: args{
payload: nil,
},
want: nil,
},
{
name: "test 2",
args: args{
payload: "hello world",
},
want: []byte(`hello world`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ToBytes(tt.args.payload); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ToBytes() = %v, want %v", got, tt.want)
}
})
}
}

func TestFromBytes(t *testing.T) {
t.Run("test 1", func(t *testing.T) {
got := FromBytes[string](nil)
if got != "" {
t.Errorf("FromBytes() = %v, want %v", got, "")
}
})

t.Run("test 2", func(t *testing.T) {
got := FromBytes[int](ToBytes(1))
if got != 1 {
t.Errorf("FromBytes() = %v, want %v", got, 1)
}
})

t.Run("test 3", func(t *testing.T) {
got := FromBytes[map[string]string](ToBytes(map[string]string{
"hello": "world",
}))
if reflect.TypeOf(got).Kind() != reflect.Map || got["hello"] != "world" {
t.Errorf("FromBytes() = %v, want %v", got, `{"hello":"world"}`)
}
})
}
26 changes: 15 additions & 11 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pubsub

import "fmt"
import (
"fmt"
)

// Consumer ...
type Consumer struct {
Expand Down Expand Up @@ -40,19 +42,18 @@ func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer
}

// Consume consume messages from the channels
func (c *Consumer) Consume(workerFunc func([]byte)) error {
conn, ch, err := initQ(c.url)
func (c *Consumer) Consume(stop chan bool, workerFunc func([]byte)) error {
ps, err := initQ(c.url)
if err != nil {
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
}
defer ch.Close()
defer conn.Close()
defer ps.Close()

if err := initPubSub(ch, c.exchange, c.queue, string(c.config.Type)); err != nil {
if err := initPubSub(ps.ch, c.exchange, c.queue, string(c.config.Type)); err != nil {
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
}

deliveries, err := ch.Consume(
deliveries, err := ps.ch.Consume(
c.queue, // queue
c.config.Tag, // consumerTag
c.config.AutoAck, // auto-ack
Expand All @@ -66,9 +67,12 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
return fmt.Errorf("failed to consume messages: %s", err.Error())
}

for d := range deliveries {
workerFunc(d.Body)
for {
select {
case <-stop:
return fmt.Errorf("stop signal received")
case message := <-deliveries:
workerFunc(message.Body)
}
}

return nil
}
9 changes: 4 additions & 5 deletions pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ func NewPublisher(rabbitURL, exchange, queue string, exchangeType ExchangeType)

// publish ...
func (p *publisher) publish(body []byte, delay time.Duration) error {
conn, ch, err := initQ(p.url)
ps, err := initQ(p.url)
if err != nil {
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
}
defer ch.Close()
defer conn.Close()
defer ps.Close()

if err := initPubSub(ch, p.exchange, p.queue, p.exchangeType); err != nil {
if err := initPubSub(ps.ch, p.exchange, p.queue, p.exchangeType); err != nil {
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
}

// publish message to exchange
err = ch.Publish(
err = ps.ch.Publish(
p.exchange, // exchange
p.queue, // routing key
true, // mandatory
Expand Down
22 changes: 17 additions & 5 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,35 @@ var (
Delayed ExchangeType = "x-delayed-message"
)

func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
// Pubsub ...
type Pubsub struct {
conn *amqp.Connection
ch *amqp.Channel
}

// Close ...
func (p *Pubsub) Close() {
_ = p.conn.Close()
_ = p.ch.Close()
}

func initQ(url string) (*Pubsub, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
return nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
}

ch, err := conn.Channel()
if err != nil {
return nil, nil, fmt.Errorf("failed to open a channel: %s", err.Error())
return nil, fmt.Errorf("failed to open a channel: %s", err.Error())
}

err = ch.Qos(10, 0, false) // fair dispatch
if err != nil {
return nil, nil, fmt.Errorf("failed to set QoS: %s", err.Error())
return nil, fmt.Errorf("failed to set QoS: %s", err.Error())
}

return conn, ch, nil
return &Pubsub{conn: conn, ch: ch}, nil
}

// initPubSub ...
Expand Down