Skip to content

Commit b467a6f

Browse files
committed
fix: generic pubsub
1 parent afa31a6 commit b467a6f

File tree

3 files changed

+25
-32
lines changed

3 files changed

+25
-32
lines changed

pubsub/consumer.go

+7-11
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@ import "fmt"
44

55
// Consumer ...
66
type Consumer struct {
7-
url string
8-
exchange string
9-
queue string
10-
enableDelay bool
7+
url string
8+
exchange string
9+
queue string
10+
exchangeType string
1111
}
1212

1313
// Create a new consumer instance
14-
func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer {
15-
if len(config) > 0 {
16-
return &Consumer{rabbitURL, exchange, queue, config[0].EnableDelay}
17-
}
18-
19-
return &Consumer{rabbitURL, exchange, queue, false}
14+
func NewConsumer(rabbitURL, exchange, queue string, exchangeType ExchangeType) *Consumer {
15+
return &Consumer{rabbitURL, exchange, queue, string(exchangeType)}
2016
}
2117

2218
// Consume consume messages from the channels
@@ -28,7 +24,7 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
2824
defer ch.Close()
2925
defer conn.Close()
3026

31-
if err := initPubSub(ch, c.exchange, c.queue, c.enableDelay); err != nil {
27+
if err := initPubSub(ch, c.exchange, c.queue, c.exchangeType); err != nil {
3228
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
3329
}
3430

pubsub/publisher.go

+7-11
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,15 @@ import (
99

1010
// publisher ...
1111
type publisher struct {
12-
url string
13-
exchange string
14-
queue string
15-
enableDelay bool
12+
url string
13+
exchange string
14+
queue string
15+
exchangeType string
1616
}
1717

1818
// NewPublisher creates a new publisher to rabbit
19-
func NewPublisher(rabbitURL, exchange, queue string, config ...Config) *publisher {
20-
if len(config) > 0 {
21-
return &publisher{rabbitURL, exchange, queue, config[0].EnableDelay}
22-
}
23-
24-
return &publisher{rabbitURL, exchange, queue, false}
19+
func NewPublisher(rabbitURL, exchange, queue string, exchangeType ExchangeType) *publisher {
20+
return &publisher{rabbitURL, exchange, queue, string(exchangeType)}
2521
}
2622

2723
// publish ...
@@ -33,7 +29,7 @@ func (p *publisher) publish(body []byte, delay time.Duration) error {
3329
defer ch.Close()
3430
defer conn.Close()
3531

36-
if err := initPubSub(ch, p.exchange, p.queue, p.enableDelay); err != nil {
32+
if err := initPubSub(ch, p.exchange, p.queue, p.exchangeType); err != nil {
3733
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
3834
}
3935

pubsub/pubsub.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,16 @@ import (
66
"github.com/streadway/amqp"
77
)
88

9-
// Config for publisher and consumer
10-
type Config struct {
11-
EnableDelay bool
12-
}
9+
// ExchangeType ...
10+
type ExchangeType string
11+
12+
var (
13+
Direct ExchangeType = "direct"
14+
Topic ExchangeType = "topic"
15+
Fanout ExchangeType = "fanout"
16+
Headers ExchangeType = "headers"
17+
Delayed ExchangeType = "x-delayed-message"
18+
)
1319

1420
func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
1521
conn, err := amqp.Dial(url)
@@ -31,12 +37,7 @@ func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
3137
}
3238

3339
// initPubSub ...
34-
func initPubSub(ch *amqp.Channel, exchangeName, queueName string, enableDelay bool) error {
35-
exchangeType := "direct"
36-
if enableDelay {
37-
exchangeType = "x-delayed-message"
38-
}
39-
40+
func initPubSub(ch *amqp.Channel, exchangeName, queueName, exchangeType string) error {
4041
err := ch.ExchangeDeclare(
4142
exchangeName, // name
4243
exchangeType, // type

0 commit comments

Comments
 (0)