Skip to content

Commit

Permalink
Merge pull request #285 from philips-software/bugfix/279
Browse files Browse the repository at this point in the history
Bugfix: unpack batch with failed messages #279
  • Loading branch information
loafoe authored Nov 11, 2022
2 parents 00e9c26 + 4164c35 commit f5e5144
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/loafoe/go-rabbitmq v0.5.0
github.com/opentracing/opentracing-go v1.2.0
github.com/openzipkin/zipkin-go v0.4.1
github.com/philips-software/go-hsdp-api v0.75.0
github.com/philips-software/go-hsdp-api v0.75.6
github.com/prometheus/client_golang v1.14.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/viper v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,8 @@ github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwb
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philips-software/go-hsdp-api v0.75.0 h1:AjYazGCntd6jH2xtGhxqqgy49bjvvAC+4bljvFVDE8k=
github.com/philips-software/go-hsdp-api v0.75.0/go.mod h1:rd6uphXchFcYW2ehT5xWGobAZrIod7qSOLZUqWh61y4=
github.com/philips-software/go-hsdp-api v0.75.6 h1:ic04DulkTUgG3VQ0n2UdVy1cD8mjWgC3nVI//H4+Km4=
github.com/philips-software/go-hsdp-api v0.75.6/go.mod h1:WLknlRw2GiSmtDufXcy28YHOcbQXn3RfB9RrT5cimxQ=
github.com/philips-software/go-hsdp-signer v1.4.0 h1:yg7UILhmI4xJhr/tQiAiQwJL0EZFvLuMqpH2GZ9ygY4=
github.com/philips-software/go-hsdp-signer v1.4.0/go.mod h1:/QehZ/+Aks2t1TFpjhF/7ZSB8PJIIJHzLc03rOqwLw0=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
Expand Down
6 changes: 3 additions & 3 deletions queue/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (n *nilStorer) StoreResources(_ []logging.Resource, count int) (*logging.St
Response: &http.Response{
StatusCode: http.StatusBadRequest,
},
Failed: map[int]logging.Resource{
10: {},
20: {},
Failed: []logging.Resource{
{},
{},
},
}, logging.ErrBatchErrors
}
Expand Down
45 changes: 9 additions & 36 deletions queue/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,56 +111,29 @@ func BodyToResource(body []byte, m Metrics) (*logging.Resource, error) {
return resource, nil
}

func contains(s []int, e int) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

func (pl *Deliverer) flushBatch(ctx context.Context, resources []logging.Resource, count int, queue Queue) (int, error) {
tracer := opentracing.GlobalTracer()
span, _ := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "deliverer_flush_batch")
defer span.Finish()
fmt.Printf("batch flushing %d messages\n", count)
maxLoop := count
l := 0

for {
l++
resp, err := pl.storer.StoreResources(resources, count)
if err == nil { // Happy flow
break
}
resp, err := pl.storer.StoreResources(resources, count)
if err != nil { // Unpack and send individually
if resp == nil {
fmt.Printf("unexpected error for StoreResource(): %v\n", err)
continue
}
nrErrors := len(resp.Failed)
keys := make([]int, 0, nrErrors)
for k := range resp.Failed {
keys = append(keys, k)
return count, err
}
// Remove offending messages and resend
pos := 0
// Unpack and send individual messages
for i := 0; i < count; i++ {
if contains(keys, i) {
fmt.Printf("resending %d\n", i+1)
_, err = pl.storer.StoreResources([]logging.Resource{resources[i]}, 1)
if err != nil {
_ = queue.DeadLetter(resources[i])
continue
fmt.Printf("permanent failure sending %d resource: [%v] error: %v\n", i+1, resources[i], err)
}
resources[pos] = resources[i]
pos++
}
count = pos
fmt.Printf("Found %d errors. Resending %d\n", nrErrors, count)

if l > maxLoop || count <= 0 {
fmt.Printf("Maximum retries reached or nothingt to send. Bailing..\n")
break
}
}

return count, nil
}

Expand Down
2 changes: 1 addition & 1 deletion queue/deliverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestDroppedMessages(t *testing.T) {
_, _ = io.Copy(&buf, r)

assert.Regexp(t, regexp.MustCompile("batch flushing 23 messages"), buf.String())
assert.Regexp(t, regexp.MustCompile("Found 2 errors. Resending 21"), buf.String())
assert.Regexp(t, regexp.MustCompile("resending 4"), buf.String())
}

func TestEncodeString(t *testing.T) {
Expand Down

0 comments on commit f5e5144

Please sign in to comment.