From 9acf2a289d63f35dd7d49843e59f1012b2a8050f Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Wed, 3 Jan 2024 19:57:09 +0100 Subject: [PATCH 1/5] initial test setup --- pool/helpers_context_test.go | 151 +++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 pool/helpers_context_test.go diff --git a/pool/helpers_context_test.go b/pool/helpers_context_test.go new file mode 100644 index 0000000..6dab528 --- /dev/null +++ b/pool/helpers_context_test.go @@ -0,0 +1,151 @@ +package pool + +import ( + "context" + "os" + "os/signal" + "sync" + "sync/atomic" + "testing" + + "github.com/jxsl13/amqpx/logging" + "github.com/stretchr/testify/assert" +) + +func worker(t *testing.T, ctx context.Context, sc *stateContext) { + log := logging.NewTestLogger(t) + defer func() { + log.Debug("worker pausing (closing)") + sc.Paused() + log.Debug("worker paused (closing)") + log.Debug("worker closed") + }() + log.Debug("worker started") + + for { + select { + case <-ctx.Done(): + log.Debug("worker done") + return + case <-sc.Resuming().Done(): + log.Debug("worker resuming") + sc.Resumed() + log.Debug("worker resumed") + } + select { + case <-ctx.Done(): + return + case <-sc.Pausing().Done(): + log.Debug("worker pausing") + sc.Paused() + log.Debug("worker paused") + } + } +} + +func TestStateContextSimpleSynchronized(t *testing.T) { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + + sc := newStateContext(ctx) + go worker(t, ctx, sc) + + // normal execution order + assert.NoError(t, sc.Resume(ctx)) + assert.NoError(t, sc.Pause(ctx)) + + // somewhat random execution order + assert.NoError(t, sc.Pause(ctx)) // if already paused, nobody cares + assert.NoError(t, sc.Resume(ctx)) + + assert.NoError(t, sc.Resume(ctx)) // should be ignored + assert.NoError(t, sc.Pause(ctx)) +} + +func TestStateContextConcurrentTransitions(t *testing.T) { + log := logging.NewTestLogger(t) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + + sc := newStateContext(ctx) + go worker(t, ctx, sc) + + var wg sync.WaitGroup + numGoroutines := 100 + trigger := make(chan struct{}) + + var ( + pause atomic.Int64 + resume atomic.Int64 + ) + wg.Add(numGoroutines / 2) + for i := 0; i < numGoroutines/2; i++ { + go func(id int) { + defer wg.Done() + select { + case <-ctx.Done(): + return + case <-trigger: + log.Infof("routine %d triggered", id) + } + + for i := 0; i < 1000; i++ { + if id%2 == 0 { + assert.NoError(t, sc.Resume(ctx)) + // log.Infof("routine %d resumed", id) + pause.Add(1) + } else { + assert.NoError(t, sc.Pause(ctx)) + // log.Infof("routine %d paused", id) + resume.Add(1) + } + } + }(i) + } + + var ( + active atomic.Int64 + awaitPaused atomic.Int64 + awaitResumed atomic.Int64 + ) + + wg.Add(numGoroutines / 2) + for i := 0; i < numGoroutines/2; i++ { + go func(id int) { + defer wg.Done() + select { + case <-ctx.Done(): + return + case <-trigger: + log.Infof("routine %d triggered", id) + } + + for i := 0; i < 1000; i++ { + switch id % 3 { + case 0: + _, err := sc.IsActive(ctx) + assert.NoError(t, err) + active.Add(1) + case 1: + assert.NoError(t, sc.AwaitPaused(ctx)) + // log.Infof("routine %d await paused", id) + awaitPaused.Add(1) + case 2: + assert.NoError(t, sc.AwaitResumed(ctx)) + // log.Infof("routine %d await resumed", id) + awaitResumed.Add(1) + } + } + + }(numGoroutines/2 + i) + } + + close(trigger) + wg.Wait() + + log.Debugf("pause: %d", pause.Load()) + log.Debugf("resume: %d", resume.Load()) + log.Debugf("active: %d", active.Load()) + log.Debugf("awaitPaused: %d", awaitPaused.Load()) + log.Debugf("awaitResumed: %d", awaitResumed.Load()) +} From 2b71a62f4eb1660e464a235765d2fceb3eb0eb13 Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Thu, 4 Jan 2024 17:10:23 +0100 Subject: [PATCH 2/5] add mutex lock in test logger wrapper --- logging/testing.go | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/logging/testing.go b/logging/testing.go index c89505d..4060252 100644 --- a/logging/testing.go +++ b/logging/testing.go @@ -5,6 +5,7 @@ import ( "os" "sort" "strings" + "sync" "testing" "time" ) @@ -19,6 +20,9 @@ func NewTestLogger(t *testing.T) *TestLogger { } func newTestLoggerWithFields(l *TestLogger, fields Fields) *TestLogger { + l.mu.Lock() + defer l.mu.Unlock() + n := &TestLogger{ t: l.t, fields: make(map[string]any, len(fields)+len(l.fields)), @@ -38,6 +42,7 @@ func newTestLoggerWithFields(l *TestLogger, fields Fields) *TestLogger { type TestLogger struct { t *testing.T fields map[string]any + mu sync.Mutex } func (l *TestLogger) Debugf(format string, args ...any) { @@ -138,23 +143,33 @@ func (l *TestLogger) fieldsMsg(level, msg string) string { func (l *TestLogger) logf(prefix, format string, args ...any) { l.t.Helper() lpref := strings.ToLower(prefix) - msg := fmt.Sprintf(format, args...) - if strings.Contains(lpref, "panic") || strings.Contains(lpref, "fatal") { - l.t.Fatal(l.fieldsMsg(prefix, msg)) - } else { - l.t.Log(l.fieldsMsg(prefix, msg)) + arg := fmt.Sprintf(format, args...) + l.mu.Lock() + defer l.mu.Unlock() + msg := l.fieldsMsg(prefix, arg) + if lpref == "panic" || lpref == "fatal" { + l.t.Fatal(msg) + } else { + l.t.Log(msg) } } func (l *TestLogger) log(prefix string, args ...any) { l.t.Helper() - msg := fmt.Sprint(args...) + lpref := strings.ToLower(prefix) - if strings.Contains(lpref, "panic") || strings.Contains(lpref, "fatal") { - l.t.Fatal(l.fieldsMsg(prefix, msg)) + arg := fmt.Sprint(args...) + + l.mu.Lock() + defer l.mu.Unlock() + msg := l.fieldsMsg(prefix, arg) + if lpref == "panic" || lpref == "fatal" { + l.mu.Lock() + l.t.Fatal(msg) + l.mu.Unlock() } else { - l.t.Log(l.fieldsMsg(prefix, msg)) + l.t.Log(msg) } } From f898c0275e9cd0c201c96aca9df61cf4f5e6563c Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Thu, 4 Jan 2024 17:10:31 +0100 Subject: [PATCH 3/5] update makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 22ff27e..fd6dfa1 100644 --- a/Makefile +++ b/Makefile @@ -8,4 +8,4 @@ down: docker-compose down test: - go test -v -race -count=1 ./... + go test -timeout 900s -v -race -count=1 ./... From 26fa24489a4059c5773e56d92779331a9a182176 Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Thu, 4 Jan 2024 17:36:46 +0100 Subject: [PATCH 4/5] add tests for state context --- pool/helpers_context_test.go | 72 +++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/pool/helpers_context_test.go b/pool/helpers_context_test.go index 6dab528..96c1f0a 100644 --- a/pool/helpers_context_test.go +++ b/pool/helpers_context_test.go @@ -7,12 +7,15 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/jxsl13/amqpx/logging" "github.com/stretchr/testify/assert" ) -func worker(t *testing.T, ctx context.Context, sc *stateContext) { +func worker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, sc *stateContext) { + defer wg.Done() + log := logging.NewTestLogger(t) defer func() { log.Debug("worker pausing (closing)") @@ -25,20 +28,36 @@ func worker(t *testing.T, ctx context.Context, sc *stateContext) { for { select { case <-ctx.Done(): - log.Debug("worker done") + //log.Debug("worker done") return case <-sc.Resuming().Done(): - log.Debug("worker resuming") + //log.Debug("worker resuming") sc.Resumed() - log.Debug("worker resumed") + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + sc.Pause(ctx) // always have at least one goroutine that triggers the switch back to the other state after a specific time + } + }() + //log.Debug("worker resumed") } select { case <-ctx.Done(): return case <-sc.Pausing().Done(): - log.Debug("worker pausing") + //log.Debug("worker pausing") sc.Paused() - log.Debug("worker paused") + //log.Debug("worker paused") + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + sc.Resume(ctx) // always have at least one goroutine that triggers the switch back to the other state after a specific time + } + }() } } } @@ -48,7 +67,9 @@ func TestStateContextSimpleSynchronized(t *testing.T) { defer cancel() sc := newStateContext(ctx) - go worker(t, ctx, sc) + var wg sync.WaitGroup + wg.Add(1) + go worker(t, ctx, &wg, sc) // normal execution order assert.NoError(t, sc.Resume(ctx)) @@ -60,6 +81,9 @@ func TestStateContextSimpleSynchronized(t *testing.T) { assert.NoError(t, sc.Resume(ctx)) // should be ignored assert.NoError(t, sc.Pause(ctx)) + + cancel() + wg.Wait() } func TestStateContextConcurrentTransitions(t *testing.T) { @@ -68,36 +92,44 @@ func TestStateContextConcurrentTransitions(t *testing.T) { defer cancel() sc := newStateContext(ctx) - go worker(t, ctx, sc) + var wwg sync.WaitGroup + wwg.Add(1) + go worker(t, ctx, &wwg, sc) var wg sync.WaitGroup - numGoroutines := 100 + var ( + numGoroutines = 1000 + iterations = 1000 + ) trigger := make(chan struct{}) var ( pause atomic.Int64 resume atomic.Int64 ) - wg.Add(numGoroutines / 2) - for i := 0; i < numGoroutines/2; i++ { + wg.Add(numGoroutines) + for i := 0; i < (numGoroutines / 10 * 8); i++ { go func(id int) { defer wg.Done() select { case <-ctx.Done(): return case <-trigger: - log.Infof("routine %d triggered", id) + //log.Infof("routine %d triggered", id) } + time.Sleep(time.Duration(id) * 2 * time.Millisecond) - for i := 0; i < 1000; i++ { + for i := 0; i < iterations; i++ { if id%2 == 0 { assert.NoError(t, sc.Resume(ctx)) // log.Infof("routine %d resumed", id) pause.Add(1) + time.Sleep(20 * time.Millisecond) } else { assert.NoError(t, sc.Pause(ctx)) // log.Infof("routine %d paused", id) resume.Add(1) + time.Sleep(20 * time.Millisecond) } } }(i) @@ -109,31 +141,34 @@ func TestStateContextConcurrentTransitions(t *testing.T) { awaitResumed atomic.Int64 ) - wg.Add(numGoroutines / 2) - for i := 0; i < numGoroutines/2; i++ { + for i := 0; i < (numGoroutines / 10 * 2); i++ { go func(id int) { defer wg.Done() select { case <-ctx.Done(): return case <-trigger: - log.Infof("routine %d triggered", id) + //log.Infof("routine %d triggered", id) } + time.Sleep(time.Duration(id) * 10 * time.Millisecond) - for i := 0; i < 1000; i++ { + for i := 0; i < iterations; i++ { switch id % 3 { case 0: _, err := sc.IsActive(ctx) assert.NoError(t, err) active.Add(1) + time.Sleep(20 * time.Millisecond) case 1: assert.NoError(t, sc.AwaitPaused(ctx)) // log.Infof("routine %d await paused", id) awaitPaused.Add(1) + time.Sleep(20 * time.Millisecond) case 2: assert.NoError(t, sc.AwaitResumed(ctx)) // log.Infof("routine %d await resumed", id) awaitResumed.Add(1) + time.Sleep(20 * time.Millisecond) } } @@ -143,6 +178,9 @@ func TestStateContextConcurrentTransitions(t *testing.T) { close(trigger) wg.Wait() + cancel() + wwg.Wait() + log.Debugf("pause: %d", pause.Load()) log.Debugf("resume: %d", resume.Load()) log.Debugf("active: %d", active.Load()) From 6d96b3b50195450ec26dcda980f2b5e9aec18548 Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Thu, 4 Jan 2024 17:46:26 +0100 Subject: [PATCH 5/5] remove replace RWMutex with mutex and refactor the internal interface slightly --- pool/helpers_context.go | 19 +++++++------------ pool/helpers_context_test.go | 13 ++++--------- pool/subscriber.go | 6 +++--- pool/subscriber_batch_handler.go | 4 ++-- pool/subscriber_handler.go | 4 ++-- 5 files changed, 18 insertions(+), 28 deletions(-) diff --git a/pool/helpers_context.go b/pool/helpers_context.go index c7bb178..2ee7746 100644 --- a/pool/helpers_context.go +++ b/pool/helpers_context.go @@ -117,7 +117,7 @@ func (c *cancelContext) Reset(parentCtx context.Context) error { } type stateContext struct { - mu sync.RWMutex + mu sync.Mutex parentCtx context.Context @@ -203,12 +203,12 @@ func (sc *stateContext) Resumed() { sc.resumed.Cancel() } -func (sc *stateContext) Resuming() doner { - return sc.resuming +func (sc *stateContext) Resuming() context.Context { + return sc.resuming.Context() } -func (sc *stateContext) Pausing() doner { - return sc.pausing +func (sc *stateContext) Pausing() context.Context { + return sc.pausing.Context() } func (sc *stateContext) Pause(ctx context.Context) error { @@ -333,8 +333,8 @@ func (sc *stateContext) AwaitPaused(ctx context.Context) (err error) { } func (sc *stateContext) isClosed() bool { - sc.mu.RLock() - defer sc.mu.RUnlock() + sc.mu.Lock() + defer sc.mu.Unlock() return sc.closed } @@ -354,8 +354,3 @@ func (sc *stateContext) closeUnguarded() { sc.resumed.Cancel() sc.closed = true } - -type doner interface { - Done() <-chan struct{} - Context() context.Context -} diff --git a/pool/helpers_context_test.go b/pool/helpers_context_test.go index 96c1f0a..2010d18 100644 --- a/pool/helpers_context_test.go +++ b/pool/helpers_context_test.go @@ -98,8 +98,8 @@ func TestStateContextConcurrentTransitions(t *testing.T) { var wg sync.WaitGroup var ( - numGoroutines = 1000 - iterations = 1000 + numGoroutines = 2000 + iterations = 10000 ) trigger := make(chan struct{}) @@ -117,19 +117,17 @@ func TestStateContextConcurrentTransitions(t *testing.T) { case <-trigger: //log.Infof("routine %d triggered", id) } - time.Sleep(time.Duration(id) * 2 * time.Millisecond) + time.Sleep(time.Duration(id/100) * 20 * time.Millisecond) for i := 0; i < iterations; i++ { if id%2 == 0 { assert.NoError(t, sc.Resume(ctx)) // log.Infof("routine %d resumed", id) pause.Add(1) - time.Sleep(20 * time.Millisecond) } else { assert.NoError(t, sc.Pause(ctx)) // log.Infof("routine %d paused", id) resume.Add(1) - time.Sleep(20 * time.Millisecond) } } }(i) @@ -150,7 +148,7 @@ func TestStateContextConcurrentTransitions(t *testing.T) { case <-trigger: //log.Infof("routine %d triggered", id) } - time.Sleep(time.Duration(id) * 10 * time.Millisecond) + time.Sleep(time.Duration(id/100) * 10 * time.Millisecond) for i := 0; i < iterations; i++ { switch id % 3 { @@ -158,17 +156,14 @@ func TestStateContextConcurrentTransitions(t *testing.T) { _, err := sc.IsActive(ctx) assert.NoError(t, err) active.Add(1) - time.Sleep(20 * time.Millisecond) case 1: assert.NoError(t, sc.AwaitPaused(ctx)) // log.Infof("routine %d await paused", id) awaitPaused.Add(1) - time.Sleep(20 * time.Millisecond) case 2: assert.NoError(t, sc.AwaitResumed(ctx)) // log.Infof("routine %d await resumed", id) awaitResumed.Add(1) - time.Sleep(20 * time.Millisecond) } } diff --git a/pool/subscriber.go b/pool/subscriber.go index 6a357f4..c7619a2 100644 --- a/pool/subscriber.go +++ b/pool/subscriber.go @@ -308,7 +308,7 @@ func (s *Subscriber) consume(h *Handler) (err error) { // got a working session delivery, err := session.ConsumeWithContext( - h.pausing().Context(), + h.pausing(), opts.Queue, opts.ConsumeOptions, ) @@ -424,7 +424,7 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) { // got a working session delivery, err := session.ConsumeWithContext( - h.pausing().Context(), + h.pausing(), opts.Queue, opts.ConsumeOptions, ) @@ -608,7 +608,7 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag type handler interface { QueueConfig() QueueConfig - pausing() doner + pausing() context.Context } func (s *Subscriber) returnSession(h handler, session *Session, err error) { diff --git a/pool/subscriber_batch_handler.go b/pool/subscriber_batch_handler.go index b8bcc58..f5ea9db 100644 --- a/pool/subscriber_batch_handler.go +++ b/pool/subscriber_batch_handler.go @@ -134,7 +134,7 @@ func (h *BatchHandler) Pause(ctx context.Context) error { return h.sc.Pause(ctx) } -func (h *BatchHandler) pausing() doner { +func (h *BatchHandler) pausing() context.Context { return h.sc.Pausing() } @@ -147,7 +147,7 @@ func (h *BatchHandler) Resume(ctx context.Context) error { return h.sc.Resume(ctx) } -func (h *BatchHandler) resuming() doner { +func (h *BatchHandler) resuming() context.Context { return h.sc.Resuming() } diff --git a/pool/subscriber_handler.go b/pool/subscriber_handler.go index c2cc04a..0062afa 100644 --- a/pool/subscriber_handler.go +++ b/pool/subscriber_handler.go @@ -110,7 +110,7 @@ func (h *Handler) Pause(ctx context.Context) error { return h.sc.Pause(ctx) } -func (h *Handler) pausing() doner { +func (h *Handler) pausing() context.Context { return h.sc.Pausing() } @@ -123,7 +123,7 @@ func (h *Handler) Resume(ctx context.Context) error { return h.sc.Resume(ctx) } -func (h *Handler) resuming() doner { +func (h *Handler) resuming() context.Context { return h.sc.Resuming() }