Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource group: support more mode for burstable #9044

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (

reservedDefaultGroupName = "default"
middlePriority = 8
unlimitedRate = math.MaxInt32
unlimitedBurstLimit = -1
)

// Manager is the manager of resource group.
Expand Down Expand Up @@ -168,8 +170,8 @@ func (m *Manager) Init(ctx context.Context) error {
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: math.MaxInt32,
BurstLimit: -1,
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
},
},
},
Expand Down
90 changes: 68 additions & 22 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,52 @@
)

const (
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
defaultBurstLimitFactor = 8.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is not suitable for high rate limit settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about 2 or 4? Maybe we need a logarithmic-like function rather than a simple multiplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, logarithmic-like is more reasonable.

defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
)

type burstableMode int

const (
limited burstableMode = iota // burstlimit is greater than 0
rateControlled // burstlimit is 0
unlimited // burstlimit is -1
moderated // burstlimit is -2
)

func getBurstableMode(settings *rmpb.TokenLimitSettings) burstableMode {
if settings == nil {
return limited
}

Check warning on line 53 in pkg/mcs/resourcemanager/server/token_buckets.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/token_buckets.go#L52-L53

Added lines #L52 - L53 were not covered by tests
// BurstLimit is used as below:
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b == -1, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within an unlimited capacity).
// - If b == -2, that means the limiter is limited capacity and fillrate(r) is ignored, can be seen as r == defaultBurstLimitFactor * fillrate (burst within a limited capacity).
// - If b > 0, that means the limiter is limited capacity.
burst := settings.GetBurstLimit()
switch {
case burst == -1:
return unlimited
case burst == -2:
return moderated
case burst == 0:
return rateControlled
case burst > 0:
return limited
default:
log.Warn("invalid burst limit, fallback to limited mode",
zap.Int64("burst-limit", burst))
return limited

Check warning on line 72 in pkg/mcs/resourcemanager/server/token_buckets.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/token_buckets.go#L69-L72

Added lines #L69 - L72 were not covered by tests
}
}

// GroupTokenBucket is a token bucket for a resource group.
// Now we don't save consumption in `GroupTokenBucket`, only statistics it in prometheus.
type GroupTokenBucket struct {
// Settings is the setting of TokenBucket.
// BurstLimit is used as below:
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within an unlimited capacity).
// - If b > 0, that means the limiter is limited capacity.
// MaxTokens limits the number of tokens that can be accumulated
Settings *rmpb.TokenLimitSettings `json:"settings,omitempty"`
GroupTokenBucketState `json:"state,omitempty"`
Expand Down Expand Up @@ -152,7 +184,9 @@
// Only slots that require a positive number will be considered alive,
// but still need to allocate the elapsed tokens as well.
if requiredToken != 0 {
slot = &TokenSlot{lastReqTime: now}
slot = &TokenSlot{
lastReqTime: now,
}
gts.tokenSlots[clientUniqueID] = slot
gts.clientConsumptionTokensSum = 0
}
Expand Down Expand Up @@ -182,7 +216,7 @@
return
}
evenRatio := 1 / float64(len(gts.tokenSlots))
if settings.GetBurstLimit() <= 0 {
if getBurstableMode(settings) == rateControlled {
for _, slot := range gts.tokenSlots {
slot.settings = &rmpb.TokenLimitSettings{
FillRate: uint64(float64(settings.GetFillRate()) * evenRatio),
Expand All @@ -191,6 +225,15 @@
}
return
}
if getBurstableMode(settings) == unlimited || settings.FillRate == unlimitedRate {
for _, slot := range gts.tokenSlots {
slot.settings = &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
}
}
return
}

for _, slot := range gts.tokenSlots {
if gts.clientConsumptionTokensSum == 0 || len(gts.tokenSlots) == 1 {
Expand All @@ -200,11 +243,7 @@
slot.requireTokensSum = 0
gts.clientConsumptionTokensSum = 0

var (
fillRate = float64(settings.GetFillRate()) * evenRatio
burstLimit = float64(settings.GetBurstLimit()) * evenRatio
)

fillRate, burstLimit := calcRateAndBurstLimit(settings, evenRatio)
slot.settings = &rmpb.TokenLimitSettings{
FillRate: uint64(fillRate),
BurstLimit: int64(burstLimit),
Expand All @@ -220,11 +259,8 @@
// (N - (a+b+...+n)/N +1) * 1/N => (N - 1 + 1) * 1/N => 1
ratio := (1 - slot.requireTokensSum/gts.clientConsumptionTokensSum + evenRatio) * evenRatio

var (
fillRate = float64(settings.GetFillRate()) * ratio
burstLimit = float64(settings.GetBurstLimit()) * ratio
assignToken = elapseTokens * ratio
)
assignToken := elapseTokens * ratio
fillRate, burstLimit := calcRateAndBurstLimit(settings, ratio)

Check warning on line 263 in pkg/mcs/resourcemanager/server/token_buckets.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/token_buckets.go#L262-L263

Added lines #L262 - L263 were not covered by tests

// Need to reserve burst limit to next balance.
if burstLimit > 0 && slot.tokenCapacity > burstLimit {
Expand All @@ -249,6 +285,17 @@
}
}

func calcRateAndBurstLimit(settings *rmpb.TokenLimitSettings, ratio float64) (fillRate, burstLimit float64) {
if getBurstableMode(settings) == moderated {
fillRate = float64(settings.GetFillRate()) * ratio * float64(defaultBurstLimitFactor)
burstLimit = fillRate
return
}
fillRate = float64(settings.GetFillRate()) * ratio
burstLimit = float64(settings.GetBurstLimit()) * ratio
return
}

// NewGroupTokenBucket returns a new GroupTokenBucket
func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) *GroupTokenBucket {
if tokenBucket == nil || tokenBucket.Settings == nil {
Expand Down Expand Up @@ -356,8 +403,7 @@
var res rmpb.TokenBucket
burstLimit := ts.settings.GetBurstLimit()
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: burstLimit}
// If BurstLimit < 0, just return.
if burstLimit < 0 {
if getBurstableMode(res.Settings) == unlimited {
res.Tokens = requiredToken
return &res, 0
}
Expand Down
84 changes: 84 additions & 0 deletions pkg/mcs/resourcemanager/server/token_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,90 @@ func TestGroupTokenBucketRequest(t *testing.T) {
re.Equal(int64(time.Second)*10/int64(time.Millisecond), trickle)
}

func TestGroupTokenBucketRequestBurstLimit(t *testing.T) {
re := require.New(t)
testGroupSetting := func(tbSetting *rmpb.TokenBucket, expectedFillRate, expectedBurstLimit int64) {
gtb := NewGroupTokenBucket(tbSetting)
time1 := time.Now()
clientUniqueID := uint64(0)
gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond), clientUniqueID)
re.Contains(gtb.tokenSlots, clientUniqueID)
// it should not be able to change group settings
groupSetting := gtb.tokenSlots[clientUniqueID]
re.Equal(expectedBurstLimit, groupSetting.settings.BurstLimit)
re.Equal(uint64(expectedFillRate), groupSetting.settings.FillRate)
// it should not be able to change gtb settings
re.Equal(tbSetting.GetSettings().BurstLimit, gtb.Settings.BurstLimit)
re.Equal(tbSetting.GetSettings().FillRate, gtb.Settings.FillRate)
}

// case 1: fillrate = 2000, burstLimit = 2000,0,-1,-2
testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 2000,
},
}, 2000, 2000)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 0,
},
}, 2000, 0)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: unlimitedBurstLimit,
},
}, unlimitedRate, unlimitedBurstLimit)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: -2,
},
}, 2000*defaultBurstLimitFactor, 2000*defaultBurstLimitFactor)

// case 2: fillrate = unlimited, burstLimit = 2000,0,-1,-2
testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: 2000,
},
}, unlimitedRate, unlimitedBurstLimit)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: 0,
},
}, unlimitedRate, 0) // burstLimit = 0 is a special case

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
},
}, unlimitedRate, unlimitedBurstLimit)

testGroupSetting(&rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: -2,
},
}, unlimitedRate, unlimitedBurstLimit)
}

func TestGroupTokenBucketRequestLoop(t *testing.T) {
re := require.New(t)
tbSetting := &rmpb.TokenBucket{
Expand Down