Skip to content

Commit

Permalink
feat: use v2 in example, small refactor of v2 experiment (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardKnop authored May 11, 2020
1 parent 5d814a4 commit 5a6bfdd
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 66 deletions.
16 changes: 0 additions & 16 deletions example/config.yml

This file was deleted.

35 changes: 15 additions & 20 deletions example/machinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@ import (
"os"
"time"

opentracing "github.com/opentracing/opentracing-go"
opentracing_log "github.com/opentracing/opentracing-go/log"
"github.com/google/uuid"
"github.com/urfave/cli"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"github.com/urfave/cli"
"github.com/RichardKnop/machinery/v2"

exampletasks "github.com/RichardKnop/machinery/example/tasks"
tracers "github.com/RichardKnop/machinery/example/tracers"
amqpbackend "github.com/RichardKnop/machinery/v1/backends/amqp"
amqpbroker "github.com/RichardKnop/machinery/v1/brokers/amqp"
opentracing "github.com/opentracing/opentracing-go"
opentracing_log "github.com/opentracing/opentracing-go/log"
)

var (
app *cli.App
configPath string
app *cli.App
)

func init() {
Expand All @@ -34,14 +35,6 @@ func init() {
app.Author = "Richard Knop"
app.Email = "risoknop@gmail.com"
app.Version = "0.0.0"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "c",
Value: "",
Destination: &configPath,
Usage: "Path to a configuration file",
},
}
}

func main() {
Expand Down Expand Up @@ -74,10 +67,6 @@ func main() {
}

func loadConfig() (*config.Config, error) {
if configPath != "" {
return config.NewFromYaml(configPath, true)
}

return config.NewFromEnvironment(true)
}

Expand All @@ -88,10 +77,16 @@ func startServer() (*machinery.Server, error) {
}

// Create server instance
server, err := machinery.NewServer(cnf)
broker, err := amqpbroker.New(cnf), nil
if err != nil {
return nil, err
}
backend, err := amqpbackend.New(cnf), nil
if err != nil {
return nil, err
}

server := machinery.NewServer(cnf, broker, backend)

// Register tasks
tasks := map[string]interface{}{
Expand Down
9 changes: 5 additions & 4 deletions v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"sync"

"github.com/google/uuid"
"github.com/opentracing/opentracing-go"

backendsiface "github.com/RichardKnop/machinery/v1/backends/iface"
"github.com/RichardKnop/machinery/v1/backends/result"
brokersiface "github.com/RichardKnop/machinery/v1/brokers/iface"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/RichardKnop/machinery/v1/tracing"

backendsiface "github.com/RichardKnop/machinery/v1/backends/iface"
brokersiface "github.com/RichardKnop/machinery/v1/brokers/iface"
opentracing "github.com/opentracing/opentracing-go"
)

// Server is the main Machinery object and stores all configuration
Expand All @@ -27,7 +28,7 @@ type Server struct {
prePublishHandler func(*tasks.Signature)
}

// NewServer ...
// NewServer creates Server instance
func NewServer(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend) *Server {
return &Server{
config: cnf,
Expand Down
21 changes: 5 additions & 16 deletions v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (

"github.com/stretchr/testify/assert"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v2"

backend "github.com/RichardKnop/machinery/v1/backends/eager"
broker "github.com/RichardKnop/machinery/v1/brokers/eager"
)

func TestRegisterTasks(t *testing.T) {
Expand Down Expand Up @@ -74,19 +77,5 @@ func TestNewCustomQueueWorker(t *testing.T) {
}

func getTestServer(t *testing.T) *machinery.Server {
server, err := machinery.NewServer(&config.Config{
Broker: "amqp://guest:guest@localhost:5672/",
DefaultQueue: "machinery_tasks",
ResultBackend: "redis://127.0.0.1:6379",
AMQP: &config.AMQPConfig{
Exchange: "machinery_exchange",
ExchangeType: "direct",
BindingKey: "machinery_task",
PrefetchCount: 1,
},
})
if err != nil {
t.Error(err)
}
return server
return machinery.NewServer(&config.Config{}, broker.New(), backend.New())
}
29 changes: 22 additions & 7 deletions v2/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package machinery
import (
"errors"
"fmt"
"net/url"
"os"
"os/signal"
"sync"
Expand All @@ -11,7 +12,6 @@ import (

"github.com/opentracing/opentracing-go"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/backends/amqp"
"github.com/RichardKnop/machinery/v1/brokers/errs"
"github.com/RichardKnop/machinery/v1/log"
Expand All @@ -32,6 +32,13 @@ type Worker struct {
preConsumeHandler func(*Worker) bool
}

var (
// ErrWorkerQuitGracefully is return when worker quit gracefully
ErrWorkerQuitGracefully = errors.New("Worker quit gracefully")
// ErrWorkerQuitGracefully is return when worker quit abruptly
ErrWorkerQuitAbruptly = errors.New("Worker quit abruptly")
)

// Launch starts a new worker process. The worker subscribes
// to the default queue and processes incoming registered tasks
func (worker *Worker) Launch() error {
Expand All @@ -49,13 +56,13 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) {

// Log some useful information about worker configuration
log.INFO.Printf("Launching a worker with the following settings:")
log.INFO.Printf("- Broker: %s", machinery.RedactURL(cnf.Broker))
log.INFO.Printf("- Broker: %s", RedactURL(cnf.Broker))
if worker.Queue == "" {
log.INFO.Printf("- DefaultQueue: %s", cnf.DefaultQueue)
} else {
log.INFO.Printf("- CustomQueue: %s", worker.Queue)
}
log.INFO.Printf("- ResultBackend: %s", machinery.RedactURL(cnf.ResultBackend))
log.INFO.Printf("- ResultBackend: %s", RedactURL(cnf.ResultBackend))
if cnf.AMQP != nil {
log.INFO.Printf("- AMQP: %s", cnf.AMQP.Exchange)
log.INFO.Printf(" - Exchange: %s", cnf.AMQP.Exchange)
Expand Down Expand Up @@ -102,12 +109,12 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
go func() {
signalWG.Add(1)
worker.Quit()
errorsChan <- errors.New("Worker quit gracefully")
errorsChan <- ErrWorkerQuitGracefully
signalWG.Done()
}()
} else {
// Abort the program when user hits Ctrl+C second time in a row
errorsChan <- errors.New("Worker quit abruptly")
errorsChan <- ErrWorkerQuitAbruptly
}
}
}
Expand Down Expand Up @@ -406,7 +413,7 @@ func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature)) {
worker.postTaskHandler = handler
}

//SetPreConsumeHandler sets a custom handler func before the task is popped
//SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
worker.preConsumeHandler = handler
}
Expand All @@ -416,11 +423,19 @@ func (worker *Worker) GetServer() *Server {
return worker.server
}

// PreConsumeHandler calls the handler before the task is popped
//
func (worker *Worker) PreConsumeHandler() bool {
if worker.preConsumeHandler == nil {
return true
}

return worker.preConsumeHandler(worker)
}

func RedactURL(urlString string) string {
u, err := url.Parse(urlString)
if err != nil {
return urlString
}
return fmt.Sprintf("%s://%s", u.Scheme, u.Host)
}
16 changes: 13 additions & 3 deletions v2/worker_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package machinery_test

import (
"github.com/RichardKnop/machinery/v2"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"

"github.com/RichardKnop/machinery/v2"
)

func TestRedactURL(t *testing.T) {
t.Parallel()

broker := "amqp://guest:guest@localhost:5672"
redactedURL := machinery.RedactURL(broker)
assert.Equal(t, "amqp://localhost:5672", redactedURL)
}

func TestPreConsumeHandler(t *testing.T) {
t.Parallel()
worker := &machinery.Worker{}
Expand All @@ -16,4 +26,4 @@ func TestPreConsumeHandler(t *testing.T) {

func SamplePreConsumeHandler(w *machinery.Worker) bool {
return true
}
}

0 comments on commit 5a6bfdd

Please sign in to comment.