Skip to content

Commit

Permalink
Update redispatcher to respect task redispatch time (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Jan 23, 2025
1 parent 04f123f commit 61caf40
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 129 deletions.
7 changes: 1 addition & 6 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2106,11 +2106,6 @@ const (

// key for history

// TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient
// KeyName: history.taskRedispatchIntervalJitterCoefficient
// Value type: Float64
// Default value: 0.15
// Allowed filters: N/A
TaskRedispatchIntervalJitterCoefficient
// QueueProcessorRandomSplitProbability is the probability for a domain to be split to a new processing queue
// KeyName: history.queueProcessorRandomSplitProbability
Expand Down Expand Up @@ -4432,7 +4427,7 @@ var FloatKeys = map[FloatKey]DynamicFloat{
},
TaskRedispatchIntervalJitterCoefficient: {
KeyName: "history.taskRedispatchIntervalJitterCoefficient",
Description: "TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient",
Description: "Deprecated",
DefaultValue: 0.15,
},
QueueProcessorRandomSplitProbability: {
Expand Down
58 changes: 28 additions & 30 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,20 @@ type Config struct {
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn

// Task process settings
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
TaskSchedulerType dynamicconfig.IntPropertyFn
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
TaskSchedulerType dynamicconfig.IntPropertyFn
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter

// QueueProcessor settings
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -368,21 +367,20 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
DeleteHistoryEventContextTimeout: dc.GetIntProperty(dynamicconfig.DeleteHistoryEventContextTimeout),
MaxResponseSize: maxMessageSize,

TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount),
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval),
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval),
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TaskRedispatchIntervalJitterCoefficient),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID),
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay),
TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount),
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval),
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID),
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay),

QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel),
Expand Down
1 change: 0 additions & 1 deletion service/history/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestNewConfig(t *testing.T) {
"TaskCriticalRetryCount": {dynamicconfig.TaskCriticalRetryCount, 37},
"ActiveTaskRedispatchInterval": {dynamicconfig.ActiveTaskRedispatchInterval, time.Second},
"StandbyTaskRedispatchInterval": {dynamicconfig.StandbyTaskRedispatchInterval, time.Second},
"TaskRedispatchIntervalJitterCoefficient": {dynamicconfig.TaskRedispatchIntervalJitterCoefficient, 1.0},
"StandbyTaskReReplicationContextTimeout": {dynamicconfig.StandbyTaskReReplicationContextTimeout, time.Second},
"EnableDropStuckTaskByDomainID": {dynamicconfig.EnableDropStuckTaskByDomainID, true},
"ResurrectionCheckMinDelay": {dynamicconfig.ResurrectionCheckMinDelay, time.Second},
Expand Down
3 changes: 1 addition & 2 deletions service/history/queue/processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ func newProcessorBase(
taskProcessor,
shard.GetTimeSource(),
&task.RedispatcherOptions{
TaskRedispatchInterval: options.RedispatchInterval,
TaskRedispatchIntervalJitterCoefficient: options.RedispatchIntervalJitterCoefficient,
TaskRedispatchInterval: options.RedispatchInterval,
},
logger,
metricsScope,
Expand Down
1 change: 0 additions & 1 deletion service/history/queue/processor_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type queueProcessorOptions struct {
UpdateAckInterval dynamicconfig.DurationPropertyFn
UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
RedispatchInterval dynamicconfig.DurationPropertyFn
RedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
MaxRedispatchQueueSize dynamicconfig.IntPropertyFn
MaxStartJitterInterval dynamicconfig.DurationPropertyFn
SplitQueueInterval dynamicconfig.DurationPropertyFn
Expand Down
1 change: 0 additions & 1 deletion service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,6 @@ func newTimerQueueProcessorOptions(
MaxPollIntervalJitterCoefficient: config.TimerProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.TimerProcessorUpdateAckInterval,
UpdateAckIntervalJitterCoefficient: config.TimerProcessorUpdateAckIntervalJitterCoefficient,
RedispatchIntervalJitterCoefficient: config.TaskRedispatchIntervalJitterCoefficient,
MaxRedispatchQueueSize: config.TimerProcessorMaxRedispatchQueueSize,
SplitQueueInterval: config.TimerProcessorSplitQueueInterval,
SplitQueueIntervalJitterCoefficient: config.TimerProcessorSplitQueueIntervalJitterCoefficient,
Expand Down
1 change: 0 additions & 1 deletion service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ func newTransferQueueProcessorOptions(
MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.TransferProcessorUpdateAckInterval,
UpdateAckIntervalJitterCoefficient: config.TransferProcessorUpdateAckIntervalJitterCoefficient,
RedispatchIntervalJitterCoefficient: config.TaskRedispatchIntervalJitterCoefficient,
MaxRedispatchQueueSize: config.TransferProcessorMaxRedispatchQueueSize,
SplitQueueInterval: config.TransferProcessorSplitQueueInterval,
SplitQueueIntervalJitterCoefficient: config.TransferProcessorSplitQueueIntervalJitterCoefficient,
Expand Down
85 changes: 27 additions & 58 deletions service/history/task/redispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
const (
defaultBufferSize = 200

redispatchBackoffCoefficient = 1.05
redispatchMaxBackoffInternval = 2 * time.Minute
redispatchBackoffCoefficient = 1.05
redispatchMaxBackoffInternval = 2 * time.Minute
redispatchFailureBackoffInterval = 2 * time.Second
)

type (
Expand All @@ -50,27 +51,24 @@ type (

// RedispatcherOptions configs redispatch interval
RedispatcherOptions struct {
TaskRedispatchInterval dynamicconfig.DurationPropertyFn
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TaskRedispatchInterval dynamicconfig.DurationPropertyFn
}

redispatcherImpl struct {
sync.Mutex

taskProcessor Processor
timeSource clock.TimeSource
options *RedispatcherOptions
logger log.Logger
metricsScope metrics.Scope

status int32
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
redispatchCh chan redispatchNotification
redispatchTimer *time.Timer
backoffPolicy backoff.RetryPolicy
pqMap map[int]collection.Queue[redispatchTask] // priority -> redispatch queue
taskChFull map[int]bool // priority -> if taskCh is full
status int32
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
redispatchCh chan redispatchNotification
timerGate clock.TimerGate
backoffPolicy backoff.RetryPolicy
pqMap map[int]collection.Queue[redispatchTask] // priority -> redispatch queue
taskChFull map[int]bool // priority -> if taskCh is full
}

redispatchTask struct {
Expand All @@ -95,12 +93,12 @@ func NewRedispatcher(
return &redispatcherImpl{
taskProcessor: taskProcessor,
timeSource: timeSource,
options: options,
logger: logger,
metricsScope: metricsScope,
status: common.DaemonStatusInitialized,
shutdownCh: make(chan struct{}),
redispatchCh: make(chan redispatchNotification, 1),
timerGate: clock.NewTimerGate(timeSource),
backoffPolicy: backoffPolicy,
pqMap: make(map[int]collection.Queue[redispatchTask]),
taskChFull: make(map[int]bool),
Expand All @@ -124,14 +122,7 @@ func (r *redispatcherImpl) Stop() {
}

close(r.shutdownCh)

r.Lock()
if r.redispatchTimer != nil {
r.redispatchTimer.Stop()
}
r.redispatchTimer = nil
r.Unlock()

r.timerGate.Stop()
if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
r.logger.Warn("Task redispatcher timedout on shutdown.", tag.LifeCycleStopTimedout)
}
Expand All @@ -144,16 +135,18 @@ func (r *redispatcherImpl) AddTask(task Task) {
attempt := task.GetAttempt()

r.Lock()
defer r.Unlock()
pq := r.getOrCreatePQLocked(priority)
t := r.getRedispatchTime(attempt)
pq.Add(redispatchTask{
task: task,
redispatchTime: r.getRedispatchTime(attempt),
redispatchTime: t,
})
r.Unlock()

r.setupTimerLocked()
r.timerGate.Update(t)
}

// TODO: review this method, it doesn't seem to redispatch the tasks immediately
func (r *redispatcherImpl) Redispatch(targetSize int) {
doneCh := make(chan struct{})
ntf := redispatchNotification{
Expand Down Expand Up @@ -185,6 +178,8 @@ func (r *redispatcherImpl) redispatchLoop() {
select {
case <-r.shutdownCh:
return
case <-r.timerGate.Chan():
r.redispatchTasks(redispatchNotification{})
case notification := <-r.redispatchCh:
r.redispatchTasks(notification)
}
Expand All @@ -199,10 +194,6 @@ func (r *redispatcherImpl) redispatchTasks(notification redispatchNotification)
if notification.doneCh != nil {
close(notification.doneCh)
}
if r.sizeLocked() > 0 {
// there are still tasks left in the queue, setup a redispatch timer for those tasks
r.setupTimerLocked()
}
}()

if r.isStopped() {
Expand Down Expand Up @@ -251,6 +242,7 @@ func (r *redispatcherImpl) redispatchTasks(notification redispatchNotification)
newPriority := item.task.Priority()
if err != nil || !submitted {
// failed to submit, enqueue again
item.redispatchTime = r.timeSource.Now().Add(redispatchFailureBackoffInterval)
r.getOrCreatePQLocked(newPriority).Add(item)
}
if err == nil && !submitted {
Expand All @@ -261,44 +253,21 @@ func (r *redispatcherImpl) redispatchTasks(notification redispatchNotification)
totalRedispatched++
}
}
if !pq.IsEmpty() {
item, _ := pq.Peek()
r.timerGate.Update(item.redispatchTime)
}
if r.isStopped() {
return
}
}
}

func (r *redispatcherImpl) setupTimerLocked() {
if r.redispatchTimer != nil || r.isStopped() {
return
}

r.redispatchTimer = time.AfterFunc(
backoff.JitDuration(
r.options.TaskRedispatchInterval(),
r.options.TaskRedispatchIntervalJitterCoefficient(),
),
func() {
r.Lock()
defer r.Unlock()
r.redispatchTimer = nil

select {
case r.redispatchCh <- redispatchNotification{
targetSize: 0,
doneCh: nil,
}:
default:
}
},
)
}

func (r *redispatcherImpl) sizeLocked() int {
size := 0
for _, queue := range r.pqMap {
size += queue.Len()
}

return size
}

Expand All @@ -308,7 +277,7 @@ func (r *redispatcherImpl) isStopped() bool {

func (r *redispatcherImpl) getRedispatchTime(attempt int) time.Time {
// note that elapsedTime (the first parameter) is not relevant when
// the retry policy has not expiration interval
// the retry policy has not expiration intervaly(0, attempt)))
return r.timeSource.Now().Add(r.backoffPolicy.ComputeNextDelay(0, attempt))
}

Expand Down
Loading

0 comments on commit 61caf40

Please sign in to comment.