Skip to content

Commit ce434db

Browse files
committed
fix: generic pubsub
1 parent 27ab176 commit ce434db

File tree

4 files changed

+10
-26
lines changed

4 files changed

+10
-26
lines changed

makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ test:
88

99
lint:
1010
@echo "Running linter..."
11-
@golangci-lint run
11+
@golangci-lint run

pubsub/consumer.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,13 @@ type Consumer struct {
99
queue string
1010
}
1111

12-
// newConsumer ...
13-
func newConsumer(rabbitURL, queueName string) *Consumer {
14-
exchange := fmt.Sprintf("%s-%s-exchange", queuePrefix, queueName)
15-
queueName = fmt.Sprintf("%s-%s", queuePrefix, queueName)
16-
return &Consumer{rabbitURL, exchange, queueName}
12+
// Create a new consumer instance
13+
func NewConsumer(rabbitURL, exchange, queue string) *Consumer {
14+
return &Consumer{rabbitURL, exchange, queue}
1715
}
1816

1917
// Consume consume messages from the channels
20-
func Consume(queueName string, workerFunc func([]byte)) error {
21-
c := newConsumer(rabbitURL, queueName)
18+
func (c *Consumer) Consume(workerFunc func([]byte)) error {
2219
conn, ch, err := initQ(c.url)
2320
if err != nil {
2421
return fmt.Errorf("failed to initialize a connection: %s", err.Error())

pubsub/publisher.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package pubsub
22

33
import (
4-
"fmt"
54
"time"
65

76
"github.com/streadway/amqp"
@@ -14,10 +13,9 @@ type publisher struct {
1413
queue string
1514
}
1615

17-
// newPublisher ...
18-
func newPublisher(queueName string) *publisher {
19-
exchange := fmt.Sprintf("%s-exchange", queueName)
20-
return &publisher{rabbitURL, exchange, queueName}
16+
// NewPublisher creates a new publisher to rabbit
17+
func NewPublisher(rabbitURL, exchange, queue string) *publisher {
18+
return &publisher{rabbitURL, exchange, queue}
2119
}
2220

2321
// publish ...
@@ -54,13 +52,11 @@ func (p *publisher) publish(body []byte, delay time.Duration) error {
5452
}
5553

5654
// PublishWithDelay ...
57-
func PublishWithDelay(queueName string, body []byte, delay time.Duration) error {
58-
p := newPublisher(fmt.Sprintf("%s-%s", queuePrefix, queueName))
55+
func (p *publisher) PublishWithDelay(queueName string, body []byte, delay time.Duration) error {
5956
return p.publish(body, delay)
6057
}
6158

6259
// Publish ...
63-
func Publish(queueName string, body []byte) error {
64-
p := newPublisher(fmt.Sprintf("%s-%s", queuePrefix, queueName))
60+
func (p *publisher) Publish(queueName string, body []byte) error {
6561
return p.publish(body, 0)
6662
}

pubsub/pubsub.go

-9
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,9 @@ package pubsub
33
import (
44
"fmt"
55

6-
"github.com/ochom/gutils/helpers"
76
"github.com/streadway/amqp"
87
)
98

10-
var (
11-
// rabbitURL is the URL of the RabbitMQ server
12-
rabbitURL = helpers.GetEnv("RABBIT_URL", "amqp://guest:guest@localhost:5672/")
13-
14-
// queuePrefix used to prefix the queue name to avoid conflict with other services
15-
queuePrefix = helpers.GetEnv("QUEUE_PREFIX", "dev")
16-
)
17-
189
func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
1910
conn, err := amqp.Dial(url)
2011
if err != nil {

0 commit comments

Comments
 (0)