Skip to content
This repository has been archived by the owner on May 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #19 from XenitAB/errgroup
Browse files Browse the repository at this point in the history
Use errgroup and context
  • Loading branch information
simongottschlag authored Jun 10, 2021
2 parents 3aea010 + fcf7bb3 commit ef89f3a
Show file tree
Hide file tree
Showing 15 changed files with 518 additions and 278 deletions.
58 changes: 2 additions & 56 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,61 +44,7 @@ jobs:
- name: Build and load (current arch)
run: |
docker buildx build --load -t ${{ env.NAME }}:${{ steps.prep.outputs.VERSION }} .
- name: Prepare test
run: |
mkdir -p tmp
docker network create --driver bridge endtoend
docker run --network endtoend -p 1883:1883 -v "$(pwd)"/test/vernemq:/mnt -e "DOCKER_VERNEMQ_VMQ_ACL__ACL_FILE=/mnt/vmq.acl" -e "DOCKER_VERNEMQ_ACCEPT_EULA=yes" -e "DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on" --name vernemq1 -d vernemq/vernemq
# Give vernemq a few seconds to start
sleep 10
echo MQTT_BROKER_ADDRESSES=vernemq1 > test/env
echo MQTT_TOPIC=test/log_entry >> test/env
docker run --network endtoend --env-file ./test/env -p 8080:8080 --name ${{ env.NAME }} -d ${{ env.NAME }}:${{ steps.prep.outputs.VERSION }}
sudo apt-get install mosquitto-clients
- name: Test mqtt-log-stdout
- name: Run end-to-end test
run: |
set -e
publish_messages() {
for i in `seq 1 $1`; do
mosquitto_pub -h localhost -p 1883 -t "test/log_entry" -i "publisher-${2}" -m "End-to-end test message (publisher-${2}): ${i}"
done
}
publish_messages 200 1 &
publish_messages 200 2 &
publish_messages 200 3 &
publish_messages 200 4 &
# Wait for all jobs to complete
wait
# Give the application a few seconds to process the messages
sleep 10
NUM_MESSAGES_RECEIVED=$(docker logs ${{ env.NAME }} | grep "End-to-end test message" | wc -l)
PUB1=$(docker logs ${{ env.NAME }} | grep "End-to-end test message (publisher-1)" | wc -l)
PUB2=$(docker logs ${{ env.NAME }} | grep "End-to-end test message (publisher-2)" | wc -l)
PUB3=$(docker logs ${{ env.NAME }} | grep "End-to-end test message (publisher-3)" | wc -l)
PUB4=$(docker logs ${{ env.NAME }} | grep "End-to-end test message (publisher-4)" | wc -l)
echo "Publisher #1: ${PUB1}"
echo "Publisher #2: ${PUB2}"
echo "Publisher #3: ${PUB3}"
echo "Publisher #4: ${PUB4}"
if [[ "${NUM_MESSAGES_RECEIVED}" != "800" ]]; then
echo Expected 800 messages received. Was: ${NUM_MESSAGES_RECEIVED}
exit 1
fi
METRICS_COUNTER=$(curl -s localhost:8080/metrics | grep "mqtt_client_total_messages" | grep -v "#" | awk '{print $2}')
echo Metrics counter: ${METRICS_COUNTER}
if [[ "${METRICS_COUNTER}" != "800" ]]; then
echo Expected metrics counter to be 800. Was: ${METRICS_COUNTER}
exit 1
fi
make e2e TAG=${{ steps.prep.outputs.VERSION }}
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ gosec:
.PHONY: cover
.SILENT: cover
cover:
go test -timeout 1m ./... -coverprofile=tmp/coverage.out 16:10:38
go test -timeout 1m -coverpkg=./... -coverprofile=tmp/coverage.out ./...
go tool cover -html=tmp/coverage.out

.PHONY: run
Expand All @@ -78,3 +78,10 @@ gen-docs:
.SILENT: build
build:
go build -ldflags "-w -s -X main.Version=$(VERSION) -X main.Revision=$(REVISION) -X main.Created=$(CREATED)" -o bin/mqtt-log-stdout cmd/mqtt-log-stdout/main.go

.PHONY: e2e
.SILENT: e2e
e2e:
./test/e2e/prepare.sh $(IMG)
./test/e2e/test.sh
./test/e2e/cleanup.sh
68 changes: 22 additions & 46 deletions cmd/mqtt-log-stdout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/hashicorp/go-multierror"
"github.com/xenitab/mqtt-log-stdout/pkg/config"
h "github.com/xenitab/mqtt-log-stdout/pkg/helper"
"github.com/xenitab/mqtt-log-stdout/pkg/message"
"github.com/xenitab/mqtt-log-stdout/pkg/metrics"
"github.com/xenitab/mqtt-log-stdout/pkg/mqtt"
Expand All @@ -24,71 +23,54 @@ var (
)

func main() {
cfg, err := newConfigClient()
cfg, err := newConfigClient(Version, Revision, Created)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to generate config: %q\n", err)
os.Exit(1)
}

err = start(cfg)
err = run(cfg)
if err != nil {
os.Exit(1)
}

os.Exit(0)
}

func start(cfg config.Client) error {
stopChan := newStopChannel()
func run(cfg config.Client) error {
errGroup, ctx, cancel := h.NewErrGroupAndContext()
defer cancel()

stopChan := h.NewStopChannel()
defer signal.Stop(stopChan)

statusClient := newStatusClient(cfg)
messageClient := newMessageClient()
metricsServer := newMetricsServer(cfg, statusClient)
mqttClient := newMqttClient(cfg, statusClient, messageClient)

go metricsServer.Start()

var result error
err := mqttClient.Start()
if err != nil {
statusClient.Print("Received error starting mqtt client", err)
result = multierror.Append(result, err)
}

stoppedBy := func() string {
select {
case sig := <-stopChan:
return fmt.Sprintf("os.Signal (%s)", sig)
case <-mqttClient.Done():
return "mqtt client"
case <-metricsServer.Done():
return "metrics server"
}
}()
h.StartService(ctx, errGroup, metricsServer)
h.StartService(ctx, errGroup, mqttClient)

stoppedBy := h.WaitForStop(stopChan, ctx)
statusClient.Print(fmt.Sprintf("Application stopping, initiated by: %s", stoppedBy), nil)

err = mqttClient.Stop()
if err != nil {
statusClient.Print("Received error stopping mqtt client", err)
result = multierror.Append(result, err)
}
cancel()

err = metricsServer.Stop()
if err != nil {
statusClient.Print("Received error stopping metrics server", err)
result = multierror.Append(result, err)
}
timeoutCtx, timeoutCancel := h.NewShutdownTimeoutContext()
defer timeoutCancel()

return result
h.StopService(timeoutCtx, errGroup, mqttClient)
h.StopService(timeoutCtx, errGroup, metricsServer)

return h.WaitForErrGroup(errGroup)
}

func newConfigClient() (config.Client, error) {
func newConfigClient(version, revision, created string) (config.Client, error) {
opts := config.Options{
Version: Version,
Revision: Revision,
Created: Created,
Version: version,
Revision: revision,
Created: created,
}

return config.NewClient(opts)
Expand Down Expand Up @@ -135,9 +117,3 @@ func newMqttClient(cfg config.Client, statusClient status.Client, messageClient

return mqtt.NewClient(opts)
}

func newStopChannel() chan os.Signal {
stopChan := make(chan os.Signal, 2)
signal.Notify(stopChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)
return stopChan
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ require (
github.com/eclipse/paho.mqtt.golang v1.3.4
github.com/fhmq/hmq v0.0.0-20210318020249-ccbe364f9fbe
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-multierror v1.1.1
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.25.0
github.com/stretchr/testify v1.4.0
github.com/urfave/cli/v2 v2.3.0
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)
4 changes: 1 addition & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,11 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
Expand Down Expand Up @@ -394,6 +391,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
47 changes: 47 additions & 0 deletions pkg/helper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package helper

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"golang.org/x/sync/errgroup"
)

func NewErrGroupAndContext() (*errgroup.Group, context.Context, context.CancelFunc) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
return g, ctx, cancel
}

func WaitForErrGroup(g *errgroup.Group) error {
err := g.Wait()
if err != nil {
return fmt.Errorf("error groups error: %w", err)
}

return nil
}

func NewShutdownTimeoutContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), 2*time.Second)
}

func WaitForStop(stopChan chan os.Signal, ctx context.Context) string {
select {
case sig := <-stopChan:
return fmt.Sprintf("os.Signal (%s)", sig)
case <-ctx.Done():
return "context"
}
}

func NewStopChannel() chan os.Signal {
stopChan := make(chan os.Signal, 2)
signal.Notify(stopChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)
return stopChan
}
27 changes: 27 additions & 0 deletions pkg/helper/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package helper

import (
"context"

"golang.org/x/sync/errgroup"
)

type ServiceStarter interface {
Start(ctx context.Context) error
}

type ServiceStopper interface {
Stop(ctx context.Context) error
}

func StartService(ctx context.Context, g *errgroup.Group, s ServiceStarter) {
g.Go(func() error {
return s.Start(ctx)
})
}

func StopService(ctx context.Context, g *errgroup.Group, s ServiceStopper) {
g.Go(func() error {
return s.Stop(ctx)
})
}
Loading

0 comments on commit ef89f3a

Please sign in to comment.