Skip to content

Commit

Permalink
chore: Add new thread struct
Browse files Browse the repository at this point in the history
Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy committed Jan 8, 2022
1 parent 4962bfc commit f69d990
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
46 changes: 21 additions & 25 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Manager struct {
doneCtx context.Context
doneCtxCancel context.CancelFunc
logger Logger
runningWaitGroup sync.WaitGroup
runningWaitGroup *routineGroup
errors []error
runAtShutdown []ShtdownJob
}
Expand All @@ -45,7 +45,9 @@ func (g *Manager) doGracefulShutdown() {
g.shutdownCtxCancel()
// doing shutdown job
for _, f := range g.runAtShutdown {
g.doShutdownJob(f)
g.runningWaitGroup.Run(func() {
g.doShutdownJob(f)
})
}
go func() {
g.waitForJobs()
Expand Down Expand Up @@ -95,39 +97,33 @@ func (g *Manager) handleSignals(ctx context.Context) {

// doShutdownJob execute shutdown task
func (g *Manager) doShutdownJob(f ShtdownJob) {
go func() {
// to handle panic cases from inside the worker
defer func() {
if err := recover(); err != nil {
msg := fmt.Errorf("panic in shutdown job: %v", err)
g.logger.Error(msg)
g.lock.Lock()
g.errors = append(g.errors, msg)
g.lock.Unlock()
}
g.runningWaitGroup.Done()
}()
if err := f(); err != nil {
// to handle panic cases from inside the worker
defer func() {
if err := recover(); err != nil {
msg := fmt.Errorf("panic in shutdown job: %v", err)
g.logger.Error(msg)
g.lock.Lock()
g.errors = append(g.errors, err)
g.errors = append(g.errors, msg)
g.lock.Unlock()
}
}()
if err := f(); err != nil {
g.lock.Lock()
g.errors = append(g.errors, err)
g.lock.Unlock()
}
}

// AddShutdownJob add shutdown task
func (g *Manager) AddShutdownJob(f ShtdownJob) {
g.runningWaitGroup.Add(1)
g.lock.Lock()
g.runAtShutdown = append(g.runAtShutdown, f)
g.lock.Unlock()
}

// AddRunningJob add running task
func (g *Manager) AddRunningJob(f RunningJob) {
g.runningWaitGroup.Add(1)

go func() {
g.runningWaitGroup.Run(func() {
// to handle panic cases from inside the worker
defer func() {
if err := recover(); err != nil {
Expand All @@ -137,14 +133,13 @@ func (g *Manager) AddRunningJob(f RunningJob) {
g.errors = append(g.errors, msg)
g.lock.Unlock()
}
g.runningWaitGroup.Done()
}()
if err := f(g.shutdownCtx); err != nil {
g.lock.Lock()
g.errors = append(g.errors, err)
g.lock.Unlock()
}
}()
})
}

// Done allows the manager to be viewed as a context.Context.
Expand All @@ -156,9 +151,10 @@ func newManager(opts ...Option) *Manager {
startOnce.Do(func() {
o := newOptions(opts...)
manager = &Manager{
lock: &sync.RWMutex{},
logger: o.logger,
errors: make([]error, 0),
lock: &sync.RWMutex{},
logger: o.logger,
errors: make([]error, 0),
runningWaitGroup: newRoutineGroup(),
}
manager.start(o.ctx)
})
Expand Down
24 changes: 24 additions & 0 deletions thread.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package graceful

import "sync"

type routineGroup struct {
waitGroup sync.WaitGroup
}

func newRoutineGroup() *routineGroup {
return new(routineGroup)
}

func (g *routineGroup) Run(fn func()) {
g.waitGroup.Add(1)

go func() {
defer g.waitGroup.Done()
fn()
}()
}

func (g *routineGroup) Wait() {
g.waitGroup.Wait()
}
1 change: 1 addition & 0 deletions thread_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package graceful

0 comments on commit f69d990

Please sign in to comment.