Skip to content

Commit afa31a6

Browse files
committed
fix: generic pubsub
1 parent 405c292 commit afa31a6

File tree

3 files changed

+39
-19
lines changed

3 files changed

+39
-19
lines changed

pubsub/consumer.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ import "fmt"
44

55
// Consumer ...
66
type Consumer struct {
7-
url string
8-
exchange string
9-
queue string
7+
url string
8+
exchange string
9+
queue string
10+
enableDelay bool
1011
}
1112

1213
// Create a new consumer instance
13-
func NewConsumer(rabbitURL, exchange, queue string) *Consumer {
14-
return &Consumer{rabbitURL, exchange, queue}
14+
func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer {
15+
if len(config) > 0 {
16+
return &Consumer{rabbitURL, exchange, queue, config[0].EnableDelay}
17+
}
18+
19+
return &Consumer{rabbitURL, exchange, queue, false}
1520
}
1621

1722
// Consume consume messages from the channels
@@ -23,7 +28,7 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
2328
defer ch.Close()
2429
defer conn.Close()
2530

26-
if err := initPubSub(ch, c.exchange, c.queue); err != nil {
31+
if err := initPubSub(ch, c.exchange, c.queue, c.enableDelay); err != nil {
2732
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
2833
}
2934

pubsub/publisher.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@ import (
99

1010
// publisher ...
1111
type publisher struct {
12-
url string
13-
exchange string
14-
queue string
12+
url string
13+
exchange string
14+
queue string
15+
enableDelay bool
1516
}
1617

1718
// NewPublisher creates a new publisher to rabbit
18-
func NewPublisher(rabbitURL, exchange, queue string) *publisher {
19-
return &publisher{rabbitURL, exchange, queue}
19+
func NewPublisher(rabbitURL, exchange, queue string, config ...Config) *publisher {
20+
if len(config) > 0 {
21+
return &publisher{rabbitURL, exchange, queue, config[0].EnableDelay}
22+
}
23+
24+
return &publisher{rabbitURL, exchange, queue, false}
2025
}
2126

2227
// publish ...
@@ -28,7 +33,7 @@ func (p *publisher) publish(body []byte, delay time.Duration) error {
2833
defer ch.Close()
2934
defer conn.Close()
3035

31-
if err := initPubSub(ch, p.exchange, p.queue); err != nil {
36+
if err := initPubSub(ch, p.exchange, p.queue, p.enableDelay); err != nil {
3237
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
3338
}
3439

pubsub/pubsub.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import (
66
"github.com/streadway/amqp"
77
)
88

9+
// Config for publisher and consumer
10+
type Config struct {
11+
EnableDelay bool
12+
}
13+
914
func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
1015
conn, err := amqp.Dial(url)
1116
if err != nil {
@@ -26,14 +31,19 @@ func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
2631
}
2732

2833
// initPubSub ...
29-
func initPubSub(ch *amqp.Channel, exchangeName, queueName string) error {
34+
func initPubSub(ch *amqp.Channel, exchangeName, queueName string, enableDelay bool) error {
35+
exchangeType := "direct"
36+
if enableDelay {
37+
exchangeType = "x-delayed-message"
38+
}
39+
3040
err := ch.ExchangeDeclare(
31-
exchangeName, // name
32-
"x-delayed-message", // type
33-
true, // durable
34-
false, // auto-deleted
35-
false, // internal
36-
false, // no-wait
41+
exchangeName, // name
42+
exchangeType, // type
43+
true, // durable
44+
false, // auto-deleted
45+
false, // internal
46+
false, // no-wait
3747
amqp.Table{
3848
"x-delayed-type": "direct",
3949
}, // arguments

0 commit comments

Comments
 (0)