Skip to content

Commit

Permalink
Merge pull request #26 from Microkubes/amqp-channel-fix
Browse files Browse the repository at this point in the history
Fixes concurency problems with rabbitmq.Channel.
  • Loading branch information
blazhovsky authored May 27, 2019
2 parents 111bd71 + cf8823f commit 9d19ff7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 29 deletions.
18 changes: 1 addition & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/Microkubes/microservice-registration/app"
"github.com/Microkubes/microservice-registration/config"
"github.com/Microkubes/microservice-tools/gateway"
"github.com/Microkubes/microservice-tools/rabbitmq"
"github.com/Microkubes/microservice-tools/utils/healthcheck"
"github.com/Microkubes/microservice-tools/utils/version"
"github.com/keitaroinc/goa"
Expand All @@ -30,19 +29,6 @@ func main() {
panic(err)
}

connRabbitMQ, channelRabbitMQ, err := rabbitmq.Dial(
cfg.RabbitMQ["username"],
cfg.RabbitMQ["password"],
cfg.RabbitMQ["host"],
cfg.RabbitMQ["post"],
)
if err != nil {
service.LogError("rabbitmq", "err", err)
panic(err)
}
defer connRabbitMQ.Close()
defer channelRabbitMQ.Close()

registration := gateway.NewKongGateway(cfg.GatewayAdminURL, &http.Client{}, &cfg.Microservice)
err = registration.SelfRegister()
if err != nil {
Expand All @@ -68,9 +54,7 @@ func main() {
c2 := NewUserController(
service,
cfg,
&rabbitmq.AMQPChannel{
Channel: channelRabbitMQ,
},
CreateRabbitmqChannel,
&http.Client{},
)
app.MountUserController(service, c2)
Expand Down
57 changes: 47 additions & 10 deletions user.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/pem"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"

Expand All @@ -19,14 +20,16 @@ import (
jwtgo "github.com/dgrijalva/jwt-go"
"github.com/keitaroinc/goa"
uuid "github.com/satori/go.uuid"
"github.com/streadway/amqp"
)

// UserController implements the user resource.
type UserController struct {
*goa.Controller
Config *config.Config
ChannelRabbitMQ rabbitmq.Channel
Client *http.Client
Config *config.Config
ChannelRabbitMQ rabbitmq.Channel
Client *http.Client
createAmqpChannel AmqpChannelFactory
}

// AMQPMessage holds data for "email-queue" AMQP channel
Expand All @@ -42,19 +45,21 @@ type UserProfile struct {
Email string
}

type AmqpChannelFactory func(*config.Config) (*amqp.Connection, rabbitmq.Channel, error)

// NewUserController creates a user controller.
func NewUserController(service *goa.Service, config *config.Config, channelRabbitMQ rabbitmq.Channel, client *http.Client) *UserController {
func NewUserController(service *goa.Service, config *config.Config, amqpFactory AmqpChannelFactory, client *http.Client) *UserController {
hystrix.ConfigureCommand("user-microservice.create_user", hystrix.CommandConfig{
Timeout: 90000,
})
hystrix.ConfigureCommand("user-microservice.update_user_profile", hystrix.CommandConfig{
Timeout: 90000,
})
return &UserController{
Controller: service.NewController("UserController"),
Config: config,
ChannelRabbitMQ: channelRabbitMQ,
Client: client,
Controller: service.NewController("UserController"),
Config: config,
Client: client,
createAmqpChannel: amqpFactory,
}
}

Expand Down Expand Up @@ -196,7 +201,16 @@ func (c *UserController) Register(ctx *app.RegisterUserContext) error {
}

if ctx.Payload.SendActivationMail {
if err := c.ChannelRabbitMQ.Send("email-queue", body); err != nil {
amqpConn, amqpChan, err := c.createAmqpChannel(c.Config)

if err != nil {
log.Println("Failed to open connection to queue: ", err.Error())
return ctx.InternalServerError(goa.ErrInternal(err))
}

defer amqpConn.Close()

if err := amqpChan.Send("email-queue", body); err != nil {
c.Service.LogError("Register: failed to serialize email payload.", "err", err.Error())
return ctx.InternalServerError(goa.ErrInternal(err))
}
Expand Down Expand Up @@ -330,7 +344,15 @@ func (c *UserController) scheduleSendVerificationMail(userID string, profile *Us
return err
}

if err = c.ChannelRabbitMQ.Send("verification-email", body); err != nil {
amqpConn, amqpChan, err := c.createAmqpChannel(c.Config)
if err != nil {
return err
}
if amqpConn != nil {
defer amqpConn.Close()
}

if err = amqpChan.Send("verification-email", body); err != nil {
return err
}

Expand Down Expand Up @@ -449,3 +471,18 @@ type RestClientError struct {
func (e *RestClientError) Error() string {
return fmt.Sprintf("%d %s %s", e.Code, e.StatusLine, e.Message)
}

func CreateRabbitmqChannel(cfg *config.Config) (*amqp.Connection, rabbitmq.Channel, error) {
connRabbitMQ, channelRabbitMQ, err := rabbitmq.Dial(
cfg.RabbitMQ["username"],
cfg.RabbitMQ["password"],
cfg.RabbitMQ["host"],
cfg.RabbitMQ["post"],
)
if err != nil {
return nil, nil, err
}
return connRabbitMQ, &rabbitmq.AMQPChannel{
Channel: channelRabbitMQ,
}, nil
}
10 changes: 8 additions & 2 deletions user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"os"
"testing"

"github.com/Microkubes/microservice-tools/rabbitmq"
"github.com/streadway/amqp"

"gopkg.in/h2non/gock.v1"

"github.com/Microkubes/microservice-registration/app"
"github.com/Microkubes/microservice-registration/app/test"
"github.com/Microkubes/microservice-registration/config"
"github.com/Microkubes/microservice-tools/rabbitmq"
"github.com/keitaroinc/goa"
)

Expand Down Expand Up @@ -52,9 +54,13 @@ var _ = json.Unmarshal(configBytes, cfg)

var (
service = goa.New("user-test")
ctrl = NewUserController(service, cfg, &rabbitmq.MockAMQPChannel{}, &http.Client{})
ctrl = NewUserController(service, cfg, CreateMockAmqpChannel, &http.Client{})
)

func CreateMockAmqpChannel(cfg *config.Config) (*amqp.Connection, rabbitmq.Channel, error) {
return nil, &rabbitmq.MockAMQPChannel{}, nil
}

func TestMain(m *testing.M) {
privkey, _ := rsa.GenerateKey(rand.Reader, 2048)
bytes := x509.MarshalPKCS1PrivateKey(privkey)
Expand Down

0 comments on commit 9d19ff7

Please sign in to comment.