From f69d990f9c95402e45b03cea288e5761a93ff0bf Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sat, 8 Jan 2022 14:44:35 +0800 Subject: [PATCH] chore: Add new thread struct Signed-off-by: Bo-Yi Wu --- manager.go | 46 +++++++++++++++++++++------------------------- thread.go | 24 ++++++++++++++++++++++++ thread_test.go | 1 + 3 files changed, 46 insertions(+), 25 deletions(-) create mode 100644 thread.go create mode 100644 thread_test.go diff --git a/manager.go b/manager.go index 533d848..1246b6f 100644 --- a/manager.go +++ b/manager.go @@ -28,7 +28,7 @@ type Manager struct { doneCtx context.Context doneCtxCancel context.CancelFunc logger Logger - runningWaitGroup sync.WaitGroup + runningWaitGroup *routineGroup errors []error runAtShutdown []ShtdownJob } @@ -45,7 +45,9 @@ func (g *Manager) doGracefulShutdown() { g.shutdownCtxCancel() // doing shutdown job for _, f := range g.runAtShutdown { - g.doShutdownJob(f) + g.runningWaitGroup.Run(func() { + g.doShutdownJob(f) + }) } go func() { g.waitForJobs() @@ -95,29 +97,25 @@ func (g *Manager) handleSignals(ctx context.Context) { // doShutdownJob execute shutdown task func (g *Manager) doShutdownJob(f ShtdownJob) { - go func() { - // to handle panic cases from inside the worker - defer func() { - if err := recover(); err != nil { - msg := fmt.Errorf("panic in shutdown job: %v", err) - g.logger.Error(msg) - g.lock.Lock() - g.errors = append(g.errors, msg) - g.lock.Unlock() - } - g.runningWaitGroup.Done() - }() - if err := f(); err != nil { + // to handle panic cases from inside the worker + defer func() { + if err := recover(); err != nil { + msg := fmt.Errorf("panic in shutdown job: %v", err) + g.logger.Error(msg) g.lock.Lock() - g.errors = append(g.errors, err) + g.errors = append(g.errors, msg) g.lock.Unlock() } }() + if err := f(); err != nil { + g.lock.Lock() + g.errors = append(g.errors, err) + g.lock.Unlock() + } } // AddShutdownJob add shutdown task func (g *Manager) AddShutdownJob(f ShtdownJob) { - g.runningWaitGroup.Add(1) g.lock.Lock() g.runAtShutdown = append(g.runAtShutdown, f) g.lock.Unlock() @@ -125,9 +123,7 @@ func (g *Manager) AddShutdownJob(f ShtdownJob) { // AddRunningJob add running task func (g *Manager) AddRunningJob(f RunningJob) { - g.runningWaitGroup.Add(1) - - go func() { + g.runningWaitGroup.Run(func() { // to handle panic cases from inside the worker defer func() { if err := recover(); err != nil { @@ -137,14 +133,13 @@ func (g *Manager) AddRunningJob(f RunningJob) { g.errors = append(g.errors, msg) g.lock.Unlock() } - g.runningWaitGroup.Done() }() if err := f(g.shutdownCtx); err != nil { g.lock.Lock() g.errors = append(g.errors, err) g.lock.Unlock() } - }() + }) } // Done allows the manager to be viewed as a context.Context. @@ -156,9 +151,10 @@ func newManager(opts ...Option) *Manager { startOnce.Do(func() { o := newOptions(opts...) manager = &Manager{ - lock: &sync.RWMutex{}, - logger: o.logger, - errors: make([]error, 0), + lock: &sync.RWMutex{}, + logger: o.logger, + errors: make([]error, 0), + runningWaitGroup: newRoutineGroup(), } manager.start(o.ctx) }) diff --git a/thread.go b/thread.go new file mode 100644 index 0000000..0e74ee9 --- /dev/null +++ b/thread.go @@ -0,0 +1,24 @@ +package graceful + +import "sync" + +type routineGroup struct { + waitGroup sync.WaitGroup +} + +func newRoutineGroup() *routineGroup { + return new(routineGroup) +} + +func (g *routineGroup) Run(fn func()) { + g.waitGroup.Add(1) + + go func() { + defer g.waitGroup.Done() + fn() + }() +} + +func (g *routineGroup) Wait() { + g.waitGroup.Wait() +} diff --git a/thread_test.go b/thread_test.go new file mode 100644 index 0000000..1fc4914 --- /dev/null +++ b/thread_test.go @@ -0,0 +1 @@ +package graceful