Skip to content

Commit 4c27920

Browse files
committed
update pubsub consume with stop signal
1 parent 62cb755 commit 4c27920

File tree

3 files changed

+41
-21
lines changed

3 files changed

+41
-21
lines changed

pubsub/consumer.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pubsub
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
)
46

57
// Consumer ...
68
type Consumer struct {
@@ -40,19 +42,18 @@ func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer
4042
}
4143

4244
// Consume consume messages from the channels
43-
func (c *Consumer) Consume(workerFunc func([]byte)) error {
44-
conn, ch, err := initQ(c.url)
45+
func (c *Consumer) Consume(stop chan bool, workerFunc func([]byte)) error {
46+
ps, err := initQ(c.url)
4547
if err != nil {
4648
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
4749
}
48-
defer ch.Close()
49-
defer conn.Close()
50+
defer ps.Close()
5051

51-
if err := initPubSub(ch, c.exchange, c.queue, string(c.config.Type)); err != nil {
52+
if err := initPubSub(ps.ch, c.exchange, c.queue, string(c.config.Type)); err != nil {
5253
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
5354
}
5455

55-
deliveries, err := ch.Consume(
56+
deliveries, err := ps.ch.Consume(
5657
c.queue, // queue
5758
c.config.Tag, // consumerTag
5859
c.config.AutoAck, // auto-ack
@@ -66,9 +67,12 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
6667
return fmt.Errorf("failed to consume messages: %s", err.Error())
6768
}
6869

69-
for d := range deliveries {
70-
workerFunc(d.Body)
70+
for {
71+
select {
72+
case <-stop:
73+
return fmt.Errorf("stop signal received")
74+
case message := <-deliveries:
75+
workerFunc(message.Body)
76+
}
7177
}
72-
73-
return nil
7478
}

pubsub/publisher.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,18 @@ func NewPublisher(rabbitURL, exchange, queue string, exchangeType ExchangeType)
2222

2323
// publish ...
2424
func (p *publisher) publish(body []byte, delay time.Duration) error {
25-
conn, ch, err := initQ(p.url)
25+
ps, err := initQ(p.url)
2626
if err != nil {
2727
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
2828
}
29-
defer ch.Close()
30-
defer conn.Close()
29+
defer ps.Close()
3130

32-
if err := initPubSub(ch, p.exchange, p.queue, p.exchangeType); err != nil {
31+
if err := initPubSub(ps.ch, p.exchange, p.queue, p.exchangeType); err != nil {
3332
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
3433
}
3534

3635
// publish message to exchange
37-
err = ch.Publish(
36+
err = ps.ch.Publish(
3837
p.exchange, // exchange
3938
p.queue, // routing key
4039
true, // mandatory

pubsub/pubsub.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,40 @@ var (
1717
Delayed ExchangeType = "x-delayed-message"
1818
)
1919

20-
func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
20+
// Pubsub ...
21+
type Pubsub struct {
22+
conn *amqp.Connection
23+
ch *amqp.Channel
24+
}
25+
26+
// Close ...
27+
func (p *Pubsub) Close() {
28+
if err := p.conn.Close(); err != nil {
29+
fmt.Println("failed to close connection: ", err.Error())
30+
}
31+
32+
if err := p.ch.Close(); err != nil {
33+
fmt.Println("failed to close channel: ", err.Error())
34+
}
35+
}
36+
37+
func initQ(url string) (*Pubsub, error) {
2138
conn, err := amqp.Dial(url)
2239
if err != nil {
23-
return nil, nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
40+
return nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
2441
}
2542

2643
ch, err := conn.Channel()
2744
if err != nil {
28-
return nil, nil, fmt.Errorf("failed to open a channel: %s", err.Error())
45+
return nil, fmt.Errorf("failed to open a channel: %s", err.Error())
2946
}
3047

3148
err = ch.Qos(10, 0, false) // fair dispatch
3249
if err != nil {
33-
return nil, nil, fmt.Errorf("failed to set QoS: %s", err.Error())
50+
return nil, fmt.Errorf("failed to set QoS: %s", err.Error())
3451
}
3552

36-
return conn, ch, nil
53+
return &Pubsub{conn: conn, ch: ch}, nil
3754
}
3855

3956
// initPubSub ...

0 commit comments

Comments
 (0)