diff --git a/actor/engine_test.go b/actor/engine_test.go index a58f36e..698bb37 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -480,3 +480,68 @@ func TestMultipleStops(t *testing.T) { <-done } } + +func TestShouldNotBlockActorOnAccidentalDuplicateRespond(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + pid := e.SpawnFunc(func(ctx *Context) { + msg := ctx.Message() + if s, ok := msg.(string); ok { + if s == "foo" { + ctx.Respond(len(s)) + // missing `return` statement, causing an unintended second response + } + ctx.Respond(len(s)) // sends a duplicate response + } + }, "str", WithID("len")) + + _ = e.Request(pid, "foo", 2*time.Second) // response will be consumed later + resp := e.Request(pid, "barbaz", 2*time.Second) + + r, err := resp.Result() + require.NoError(t, err) + require.Equal(t, 6, r) +} + +func TestShouldNotReceiveAccidentallySentSecondResult(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + done := make(chan struct{}) + pid := e.SpawnFunc(func(ctx *Context) { + msg := ctx.Message() + if s, ok := msg.(string); ok && s == "foo" { + defer close(done) + if s == "foo" { + ctx.Respond(1) + // missing `return` statement, causing an unintended second response + } + select { // sends a duplicate response + case <-time.After(100 * time.Millisecond): + case <-runAsync(func() { + ctx.Respond(2) + }): + } + } + }, "kind") + + resp := e.Request(pid, "foo", 200*time.Millisecond) + <-done + + r, err := resp.Result() + require.NoError(t, err) + require.Equal(t, 1, r) + require.Nil(t, e.Registry.get(resp.pid)) + + r, err = resp.Result() + require.Error(t, err) + require.Nil(t, r) +} + +func runAsync(f func()) <-chan struct{} { + ch := make(chan struct{}) + go func() { + defer close(ch) + f() + }() + return ch +} diff --git a/actor/registry.go b/actor/registry.go index 8f578b8..f541877 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -69,3 +69,13 @@ func (r *Registry) add(proc Processer) { r.mu.Unlock() proc.Start() } + +func (r *Registry) remove(pid *PID) bool { + r.mu.Lock() + defer r.mu.Unlock() + _, ok := r.lookup[pid.ID] + if ok { + delete(r.lookup, pid.ID) + } + return ok +} diff --git a/actor/response.go b/actor/response.go index c6493f7..68d4aff 100644 --- a/actor/response.go +++ b/actor/response.go @@ -40,7 +40,11 @@ func (r *Response) Result() (any, error) { } func (r *Response) Send(_ *PID, msg any, _ *PID) { - r.result <- msg + // Under normal conditions, the method is expected to be called only once. + // To prevent accidental duplicate responses, we promptly remove the process from the registry + if r.engine.Registry.remove(r.pid) { + r.result <- msg + } } func (r *Response) PID() *PID { return r.pid }