Skip to content

Commit

Permalink
Merge pull request #42 from jxsl13/test/context-state
Browse files Browse the repository at this point in the history
initial test setup
  • Loading branch information
john-behm-bertelsmann authored Jan 4, 2024
2 parents 9f8c0af + 6d96b3b commit d315b02
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ down:
docker-compose down

test:
go test -v -race -count=1 ./...
go test -timeout 900s -v -race -count=1 ./...
33 changes: 24 additions & 9 deletions logging/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"sort"
"strings"
"sync"
"testing"
"time"
)
Expand All @@ -19,6 +20,9 @@ func NewTestLogger(t *testing.T) *TestLogger {
}

func newTestLoggerWithFields(l *TestLogger, fields Fields) *TestLogger {
l.mu.Lock()
defer l.mu.Unlock()

n := &TestLogger{
t: l.t,
fields: make(map[string]any, len(fields)+len(l.fields)),
Expand All @@ -38,6 +42,7 @@ func newTestLoggerWithFields(l *TestLogger, fields Fields) *TestLogger {
type TestLogger struct {
t *testing.T
fields map[string]any
mu sync.Mutex
}

func (l *TestLogger) Debugf(format string, args ...any) {
Expand Down Expand Up @@ -138,23 +143,33 @@ func (l *TestLogger) fieldsMsg(level, msg string) string {
func (l *TestLogger) logf(prefix, format string, args ...any) {
l.t.Helper()
lpref := strings.ToLower(prefix)
msg := fmt.Sprintf(format, args...)
if strings.Contains(lpref, "panic") || strings.Contains(lpref, "fatal") {
l.t.Fatal(l.fieldsMsg(prefix, msg))
} else {
l.t.Log(l.fieldsMsg(prefix, msg))
arg := fmt.Sprintf(format, args...)

l.mu.Lock()
defer l.mu.Unlock()
msg := l.fieldsMsg(prefix, arg)
if lpref == "panic" || lpref == "fatal" {
l.t.Fatal(msg)
} else {
l.t.Log(msg)
}

}

func (l *TestLogger) log(prefix string, args ...any) {
l.t.Helper()
msg := fmt.Sprint(args...)

lpref := strings.ToLower(prefix)
if strings.Contains(lpref, "panic") || strings.Contains(lpref, "fatal") {
l.t.Fatal(l.fieldsMsg(prefix, msg))
arg := fmt.Sprint(args...)

l.mu.Lock()
defer l.mu.Unlock()
msg := l.fieldsMsg(prefix, arg)
if lpref == "panic" || lpref == "fatal" {
l.mu.Lock()
l.t.Fatal(msg)
l.mu.Unlock()
} else {
l.t.Log(l.fieldsMsg(prefix, msg))
l.t.Log(msg)
}
}
19 changes: 7 additions & 12 deletions pool/helpers_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *cancelContext) Reset(parentCtx context.Context) error {
}

type stateContext struct {
mu sync.RWMutex
mu sync.Mutex

parentCtx context.Context

Expand Down Expand Up @@ -203,12 +203,12 @@ func (sc *stateContext) Resumed() {
sc.resumed.Cancel()
}

func (sc *stateContext) Resuming() doner {
return sc.resuming
func (sc *stateContext) Resuming() context.Context {
return sc.resuming.Context()
}

func (sc *stateContext) Pausing() doner {
return sc.pausing
func (sc *stateContext) Pausing() context.Context {
return sc.pausing.Context()
}

func (sc *stateContext) Pause(ctx context.Context) error {
Expand Down Expand Up @@ -333,8 +333,8 @@ func (sc *stateContext) AwaitPaused(ctx context.Context) (err error) {
}

func (sc *stateContext) isClosed() bool {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.closed
}

Expand All @@ -354,8 +354,3 @@ func (sc *stateContext) closeUnguarded() {
sc.resumed.Cancel()
sc.closed = true
}

type doner interface {
Done() <-chan struct{}
Context() context.Context
}
184 changes: 184 additions & 0 deletions pool/helpers_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package pool

import (
"context"
"os"
"os/signal"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/jxsl13/amqpx/logging"
"github.com/stretchr/testify/assert"
)

func worker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, sc *stateContext) {
defer wg.Done()

log := logging.NewTestLogger(t)
defer func() {
log.Debug("worker pausing (closing)")
sc.Paused()
log.Debug("worker paused (closing)")
log.Debug("worker closed")
}()
log.Debug("worker started")

for {
select {
case <-ctx.Done():
//log.Debug("worker done")
return
case <-sc.Resuming().Done():
//log.Debug("worker resuming")
sc.Resumed()
go func() {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
sc.Pause(ctx) // always have at least one goroutine that triggers the switch back to the other state after a specific time
}
}()
//log.Debug("worker resumed")
}
select {
case <-ctx.Done():
return
case <-sc.Pausing().Done():
//log.Debug("worker pausing")
sc.Paused()
//log.Debug("worker paused")
go func() {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
sc.Resume(ctx) // always have at least one goroutine that triggers the switch back to the other state after a specific time
}
}()
}
}
}

func TestStateContextSimpleSynchronized(t *testing.T) {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

sc := newStateContext(ctx)
var wg sync.WaitGroup
wg.Add(1)
go worker(t, ctx, &wg, sc)

// normal execution order
assert.NoError(t, sc.Resume(ctx))
assert.NoError(t, sc.Pause(ctx))

// somewhat random execution order
assert.NoError(t, sc.Pause(ctx)) // if already paused, nobody cares
assert.NoError(t, sc.Resume(ctx))

assert.NoError(t, sc.Resume(ctx)) // should be ignored
assert.NoError(t, sc.Pause(ctx))

cancel()
wg.Wait()
}

func TestStateContextConcurrentTransitions(t *testing.T) {
log := logging.NewTestLogger(t)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

sc := newStateContext(ctx)
var wwg sync.WaitGroup
wwg.Add(1)
go worker(t, ctx, &wwg, sc)

var wg sync.WaitGroup
var (
numGoroutines = 2000
iterations = 10000
)
trigger := make(chan struct{})

var (
pause atomic.Int64
resume atomic.Int64
)
wg.Add(numGoroutines)
for i := 0; i < (numGoroutines / 10 * 8); i++ {
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-trigger:
//log.Infof("routine %d triggered", id)
}
time.Sleep(time.Duration(id/100) * 20 * time.Millisecond)

for i := 0; i < iterations; i++ {
if id%2 == 0 {
assert.NoError(t, sc.Resume(ctx))
// log.Infof("routine %d resumed", id)
pause.Add(1)
} else {
assert.NoError(t, sc.Pause(ctx))
// log.Infof("routine %d paused", id)
resume.Add(1)
}
}
}(i)
}

var (
active atomic.Int64
awaitPaused atomic.Int64
awaitResumed atomic.Int64
)

for i := 0; i < (numGoroutines / 10 * 2); i++ {
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-trigger:
//log.Infof("routine %d triggered", id)
}
time.Sleep(time.Duration(id/100) * 10 * time.Millisecond)

for i := 0; i < iterations; i++ {
switch id % 3 {
case 0:
_, err := sc.IsActive(ctx)
assert.NoError(t, err)
active.Add(1)
case 1:
assert.NoError(t, sc.AwaitPaused(ctx))
// log.Infof("routine %d await paused", id)
awaitPaused.Add(1)
case 2:
assert.NoError(t, sc.AwaitResumed(ctx))
// log.Infof("routine %d await resumed", id)
awaitResumed.Add(1)
}
}

}(numGoroutines/2 + i)
}

close(trigger)
wg.Wait()

cancel()
wwg.Wait()

log.Debugf("pause: %d", pause.Load())
log.Debugf("resume: %d", resume.Load())
log.Debugf("active: %d", active.Load())
log.Debugf("awaitPaused: %d", awaitPaused.Load())
log.Debugf("awaitResumed: %d", awaitResumed.Load())
}
6 changes: 3 additions & 3 deletions pool/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (s *Subscriber) consume(h *Handler) (err error) {

// got a working session
delivery, err := session.ConsumeWithContext(
h.pausing().Context(),
h.pausing(),
opts.Queue,
opts.ConsumeOptions,
)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {

// got a working session
delivery, err := session.ConsumeWithContext(
h.pausing().Context(),
h.pausing(),
opts.Queue,
opts.ConsumeOptions,
)
Expand Down Expand Up @@ -608,7 +608,7 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag

type handler interface {
QueueConfig() QueueConfig
pausing() doner
pausing() context.Context
}

func (s *Subscriber) returnSession(h handler, session *Session, err error) {
Expand Down
4 changes: 2 additions & 2 deletions pool/subscriber_batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h *BatchHandler) Pause(ctx context.Context) error {
return h.sc.Pause(ctx)
}

func (h *BatchHandler) pausing() doner {
func (h *BatchHandler) pausing() context.Context {
return h.sc.Pausing()
}

Expand All @@ -147,7 +147,7 @@ func (h *BatchHandler) Resume(ctx context.Context) error {
return h.sc.Resume(ctx)
}

func (h *BatchHandler) resuming() doner {
func (h *BatchHandler) resuming() context.Context {
return h.sc.Resuming()
}

Expand Down
4 changes: 2 additions & 2 deletions pool/subscriber_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (h *Handler) Pause(ctx context.Context) error {
return h.sc.Pause(ctx)
}

func (h *Handler) pausing() doner {
func (h *Handler) pausing() context.Context {
return h.sc.Pausing()
}

Expand All @@ -123,7 +123,7 @@ func (h *Handler) Resume(ctx context.Context) error {
return h.sc.Resume(ctx)
}

func (h *Handler) resuming() doner {
func (h *Handler) resuming() context.Context {
return h.sc.Resuming()
}

Expand Down

0 comments on commit d315b02

Please sign in to comment.