Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry message failures #189

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,16 @@ func TestRestartsMaxRestarts(t *testing.T) {
data int
}
pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
switch msg := c.Message().(type) {
case Started:
case Stopped:
case payload:
if msg.data != 1 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messages after borking.", msg.data)
}
}
}, "foo", WithMaxRestarts(restarts))
}, "foo", WithMaxRetries(1), WithMaxRestarts(restarts))

for i := 0; i < 2; i++ {
e.Send(pid, payload{i})
Expand Down Expand Up @@ -158,7 +157,7 @@ func TestRestarts(t *testing.T) {
wg.Done()
}
}
}, "foo", WithRestartDelay(time.Millisecond*10))
}, "foo", WithRetries(0), WithRestartDelay(time.Millisecond*10))

e.Send(pid, payload{1})
e.Send(pid, payload{2})
Expand Down
11 changes: 11 additions & 0 deletions actor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ func (e ActorMaxRestartsExceededEvent) Log() (slog.Level, string, []any) {
return slog.LevelError, "Actor crashed too many times", []any{"pid", e.PID.GetID()}
}

// ActorUnprocessableMessageEvent gets published if an actor is unable to process the message after retries
type ActorUnprocessableMessageEvent struct {
PID *PID
Timestamp time.Time
Message any
}

func (e ActorUnprocessableMessageEvent) Log() (slog.Level, string, []any) {
return slog.LevelError, "Actor unable to process message", []any{"pid", e.PID.GetID()}
}

// ActorDuplicateIdEvent gets published if we try to register the same name twice.
type ActorDuplicateIdEvent struct {
PID *PID
Expand Down
18 changes: 18 additions & 0 deletions actor/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
const (
defaultInboxSize = 1024
defaultMaxRestarts = 3
defaultRetries = 2
defaultMaxRetries = 2
)

var defaultRestartDelay = 500 * time.Millisecond
Expand All @@ -21,6 +23,8 @@ type Opts struct {
Kind string
ID string
MaxRestarts int32
Retries int32
MaxRetries int32
RestartDelay time.Duration
InboxSize int
Middleware []MiddlewareFunc
Expand All @@ -35,6 +39,8 @@ func DefaultOpts(p Producer) Opts {
Context: context.Background(),
Producer: p,
MaxRestarts: defaultMaxRestarts,
Retries: defaultRetries,
MaxRetries: defaultMaxRetries,
InboxSize: defaultInboxSize,
RestartDelay: defaultRestartDelay,
Middleware: []MiddlewareFunc{},
Expand Down Expand Up @@ -71,6 +77,18 @@ func WithMaxRestarts(n int) OptFunc {
}
}

func WithRetries(n int) OptFunc {
return func(opts *Opts) {
opts.Retries = int32(n)
}
}

func WithMaxRetries(n int) OptFunc {
return func(opts *Opts) {
opts.MaxRetries = int32(n)
}
}

func WithID(id string) OptFunc {
return func(opts *Opts) {
opts.ID = id
Expand Down
64 changes: 44 additions & 20 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ type process struct {
context *Context
pid *PID
restarts int32
retries int32
mbuffer []Envelope
}

func newProcess(e *Engine, opts Opts) *process {
pid := NewPID(e.address, opts.Kind+pidSeparator+opts.ID)
ctx := newContext(opts.Context, e, pid)
p := &process{
pid: pid,
inbox: NewInbox(opts.InboxSize),
Opts: opts,
context: ctx,
mbuffer: nil,
pid: pid,
inbox: NewInbox(opts.InboxSize),
Opts: opts,
context: ctx,
mbuffer: nil,
restarts: 0,
retries: 0,
}
return p
}
Expand All @@ -61,34 +64,51 @@ func (p *process) Invoke(msgs []Envelope) {
nmsg = len(msgs)
// numbers of msgs that are processed.
nproc = 0
// FIXME: We could use nrpoc here, but for some reason placing nproc++ on the
// bottom of the function it freezes some tests. Hence, I created a new counter
// for bookkeeping.
processed = 0
)
defer func() {
// If we recovered, we buffer up all the messages that we could not process
// so we can retry them on the next restart.
if v := recover(); v != nil {
p.context.message = Stopped{}
p.context.receiver.Receive(p.context)
if p.Retries > 0 {
p.retries++
// If at max retries, drop the message and move onto the next message.
if p.retries%p.Retries == 0 {
p.context.engine.BroadcastEvent(ActorUnprocessableMessageEvent{
PID: p.pid,
Timestamp: time.Now(),
Message: msgs[nproc].Msg,
})
nproc++
}
} else {
// If retries opt equals 0, message is dropped on each subsequent restart.
p.context.engine.BroadcastEvent(ActorUnprocessableMessageEvent{
PID: p.pid,
Timestamp: time.Now(),
Message: msgs[nproc].Msg,
})
nproc++
}

p.mbuffer = make([]Envelope, nmsg-nproc)
for i := 0; i < nmsg-nproc; i++ {
p.mbuffer[i] = msgs[i+nproc]
}
p.tryRestart(v)
if p.Retries == 0 || p.retries%(p.Retries*p.MaxRetries) == 0 {
p.tryRestart(v)
} else {
p.Invoke(p.mbuffer)
}
}
}()

for i := 0; i < len(msgs); i++ {
nproc++
for i := 0; i < nmsg; i++ {
msg := msgs[i]
if pill, ok := msg.Msg.(poisonPill); ok {
// If we need to gracefuly stop, we process all the messages
// from the inbox, otherwise we ignore and cleanup.
if pill.graceful {
msgsToProcess := msgs[processed:]
msgsToProcess := msgs[nproc:]
for _, m := range msgsToProcess {
p.invokeMsg(m)
}
Expand All @@ -97,7 +117,8 @@ func (p *process) Invoke(msgs []Envelope) {
return
}
p.invokeMsg(msg)
processed++
p.retries = 0
nproc++
}
}

Expand All @@ -121,9 +142,8 @@ func (p *process) Start() {
p.context.receiver = recv
defer func() {
if v := recover(); v != nil {
p.context.message = Stopped{}
p.context.receiver.Receive(p.context)
p.tryRestart(v)
// Actor crashed too many times and exceeded max restarts so let it terminate and upstream handle it.
// Todo maybe add some logging here to catch anything abnormal
}
}()
p.context.message = Initialized{}
Expand All @@ -135,6 +155,7 @@ func (p *process) Start() {
p.context.engine.BroadcastEvent(ActorStartedEvent{PID: p.pid, Timestamp: time.Now()})
// If we have messages in our buffer, invoke them.
if len(p.mbuffer) > 0 {
p.retries = 0
p.Invoke(p.mbuffer)
p.mbuffer = nil
}
Expand Down Expand Up @@ -167,6 +188,7 @@ func (p *process) tryRestart(v any) {
}

p.restarts++

// Restart the process after its restartDelay
p.context.engine.BroadcastEvent(ActorRestartedEvent{
PID: p.pid,
Expand All @@ -193,7 +215,9 @@ func (p *process) cleanup(cancel context.CancelFunc) {
}
}

p.inbox.Stop()
p.mbuffer = nil

_ = p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)
Expand Down
80 changes: 62 additions & 18 deletions actor/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,89 @@ package actor
import (
"bytes"
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type triggerPanic struct {
data int
}

func Test_DropsMessageAfterRetries(t *testing.T) {
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)

ch := make(chan any, 1)

pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Actor state: %T\n", c.Message())
switch m := c.Message().(type) {
case Started:
c.Engine().Subscribe(c.pid)
case triggerPanic:
if m.data == 2 {
panicWrapper()
} else {
fmt.Println("finally processed message", m.data)
}
case ActorUnprocessableMessageEvent:
fmt.Printf("Got message type %T data:%d\n", m.Message, m.Message.(triggerPanic).data)
ch <- m.Message
}
}, "kind", WithMaxRetries(1))

e.Send(pid, triggerPanic{1})
e.Send(pid, triggerPanic{2})
e.Send(pid, triggerPanic{3})

var messages []any
select {
case m := <-ch:
messages = append(messages, m)
case <-time.After(2 * time.Second):
t.Error("timeout")
}
require.Len(t, messages, 1)
require.IsType(t, triggerPanic{}, messages[0])
require.Equal(t, triggerPanic{data: 2}, messages[0])
}

// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function
// which triggers the panic is at the top of the stack trace.
func Test_CleanTrace(t *testing.T) {
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
type triggerPanic struct {
data int
}
stopCh := make(chan struct{})
ch := make(chan any)
pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
switch c.Message().(type) {
switch m := c.Message().(type) {
case Started:
c.Engine().Subscribe(c.pid)
case triggerPanic:
panicWrapper()
if m.data != 10 {
panicWrapper()
} else {
fmt.Println("finally processed all my messages", m.data)
}
case ActorRestartedEvent:
m := c.Message().(ActorRestartedEvent)
// split the panic into lines:
lines := bytes.Split(m.Stacktrace, []byte("\n"))
// check that the second line is the panicWrapper function:
if bytes.Contains(lines[1], []byte("panicWrapper")) {
fmt.Println("stack trace contains panicWrapper at the right line")
stopCh <- struct{}{}
}
// check that the second line is the panicWrapper function
assert.True(t, bytes.Contains(lines[1], []byte("panicWrapper")))
close(ch)
}
}, "foo", WithMaxRestarts(1))
e.Send(pid, triggerPanic{1})
}, "foo", WithRetries(2), WithMaxRetries(2), WithMaxRestarts(1))
e.Send(pid, triggerPanic{2})
e.Send(pid, triggerPanic{3})
e.Send(pid, triggerPanic{10})
select {
case <-stopCh:
fmt.Println("test passed")
case <-time.After(time.Second):
t.Error("test timed out. stack trace likely did not contain panicWrapper at the right line")
case <-ch:
case <-time.After(2 * time.Second):
t.Error("test timed out")
return
}
}

Expand Down