Skip to content

Commit 82ea726

Browse files
authored
Merge pull request #39 from ochom/dev
update test to bytes
2 parents 665135b + 5ba31f6 commit 82ea726

File tree

5 files changed

+117
-21
lines changed

5 files changed

+117
-21
lines changed

helpers/json.go

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func ToBytes(payload any) []byte {
2525
// Marshal the payload to JSON.
2626
bytesPayload, err := json.Marshal(&payload)
2727
if err != nil {
28+
logs.Error("Failed to marshal JSON: %s", err.Error())
2829
return nil
2930
}
3031

helpers/json_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package helpers
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
)
7+
8+
func TestToBytes(t *testing.T) {
9+
type args struct {
10+
payload any
11+
}
12+
tests := []struct {
13+
name string
14+
args args
15+
want []byte
16+
}{
17+
{
18+
name: "test 1",
19+
args: args{
20+
payload: map[string]string{
21+
"hello": "world",
22+
},
23+
},
24+
want: []byte(`{"hello":"world"}`),
25+
},
26+
{
27+
name: "test 2",
28+
args: args{
29+
payload: 1,
30+
},
31+
want: []byte(`1`),
32+
},
33+
{
34+
name: "test 2",
35+
args: args{
36+
payload: nil,
37+
},
38+
want: nil,
39+
},
40+
{
41+
name: "test 2",
42+
args: args{
43+
payload: "hello world",
44+
},
45+
want: []byte(`hello world`),
46+
},
47+
}
48+
for _, tt := range tests {
49+
t.Run(tt.name, func(t *testing.T) {
50+
if got := ToBytes(tt.args.payload); !reflect.DeepEqual(got, tt.want) {
51+
t.Errorf("ToBytes() = %v, want %v", got, tt.want)
52+
}
53+
})
54+
}
55+
}
56+
57+
func TestFromBytes(t *testing.T) {
58+
t.Run("test 1", func(t *testing.T) {
59+
got := FromBytes[string](nil)
60+
if got != "" {
61+
t.Errorf("FromBytes() = %v, want %v", got, "")
62+
}
63+
})
64+
65+
t.Run("test 2", func(t *testing.T) {
66+
got := FromBytes[int](ToBytes(1))
67+
if got != 1 {
68+
t.Errorf("FromBytes() = %v, want %v", got, 1)
69+
}
70+
})
71+
72+
t.Run("test 3", func(t *testing.T) {
73+
got := FromBytes[map[string]string](ToBytes(map[string]string{
74+
"hello": "world",
75+
}))
76+
if reflect.TypeOf(got).Kind() != reflect.Map || got["hello"] != "world" {
77+
t.Errorf("FromBytes() = %v, want %v", got, `{"hello":"world"}`)
78+
}
79+
})
80+
}

pubsub/consumer.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pubsub
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
)
46

57
// Consumer ...
68
type Consumer struct {
@@ -40,19 +42,18 @@ func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer
4042
}
4143

4244
// Consume consume messages from the channels
43-
func (c *Consumer) Consume(workerFunc func([]byte)) error {
44-
conn, ch, err := initQ(c.url)
45+
func (c *Consumer) Consume(stop chan bool, workerFunc func([]byte)) error {
46+
ps, err := initQ(c.url)
4547
if err != nil {
4648
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
4749
}
48-
defer ch.Close()
49-
defer conn.Close()
50+
defer ps.Close()
5051

51-
if err := initPubSub(ch, c.exchange, c.queue, string(c.config.Type)); err != nil {
52+
if err := initPubSub(ps.ch, c.exchange, c.queue, string(c.config.Type)); err != nil {
5253
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
5354
}
5455

55-
deliveries, err := ch.Consume(
56+
deliveries, err := ps.ch.Consume(
5657
c.queue, // queue
5758
c.config.Tag, // consumerTag
5859
c.config.AutoAck, // auto-ack
@@ -66,9 +67,12 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
6667
return fmt.Errorf("failed to consume messages: %s", err.Error())
6768
}
6869

69-
for d := range deliveries {
70-
workerFunc(d.Body)
70+
for {
71+
select {
72+
case <-stop:
73+
return fmt.Errorf("stop signal received")
74+
case message := <-deliveries:
75+
workerFunc(message.Body)
76+
}
7177
}
72-
73-
return nil
7478
}

pubsub/publisher.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,18 @@ func NewPublisher(rabbitURL, exchange, queue string, exchangeType ExchangeType)
2222

2323
// publish ...
2424
func (p *publisher) publish(body []byte, delay time.Duration) error {
25-
conn, ch, err := initQ(p.url)
25+
ps, err := initQ(p.url)
2626
if err != nil {
2727
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
2828
}
29-
defer ch.Close()
30-
defer conn.Close()
29+
defer ps.Close()
3130

32-
if err := initPubSub(ch, p.exchange, p.queue, p.exchangeType); err != nil {
31+
if err := initPubSub(ps.ch, p.exchange, p.queue, p.exchangeType); err != nil {
3332
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
3433
}
3534

3635
// publish message to exchange
37-
err = ch.Publish(
36+
err = ps.ch.Publish(
3837
p.exchange, // exchange
3938
p.queue, // routing key
4039
true, // mandatory

pubsub/pubsub.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,35 @@ var (
1717
Delayed ExchangeType = "x-delayed-message"
1818
)
1919

20-
func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
20+
// Pubsub ...
21+
type Pubsub struct {
22+
conn *amqp.Connection
23+
ch *amqp.Channel
24+
}
25+
26+
// Close ...
27+
func (p *Pubsub) Close() {
28+
_ = p.conn.Close()
29+
_ = p.ch.Close()
30+
}
31+
32+
func initQ(url string) (*Pubsub, error) {
2133
conn, err := amqp.Dial(url)
2234
if err != nil {
23-
return nil, nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
35+
return nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
2436
}
2537

2638
ch, err := conn.Channel()
2739
if err != nil {
28-
return nil, nil, fmt.Errorf("failed to open a channel: %s", err.Error())
40+
return nil, fmt.Errorf("failed to open a channel: %s", err.Error())
2941
}
3042

3143
err = ch.Qos(10, 0, false) // fair dispatch
3244
if err != nil {
33-
return nil, nil, fmt.Errorf("failed to set QoS: %s", err.Error())
45+
return nil, fmt.Errorf("failed to set QoS: %s", err.Error())
3446
}
3547

36-
return conn, ch, nil
48+
return &Pubsub{conn: conn, ch: ch}, nil
3749
}
3850

3951
// initPubSub ...

0 commit comments

Comments
 (0)