Skip to content

Commit

Permalink
fix(libbeat): mitigate race condition in ratelimit processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mauri870 committed Feb 28, 2025
1 parent c0fdd97 commit 92dac6e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 22 deletions.
13 changes: 13 additions & 0 deletions libbeat/processors/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,16 @@ func TestRateLimit(t *testing.T) {
})
}
}

func BenchmarkRateLimit(b *testing.B) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(b, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

b.ResetTimer()
for i := 0; i < b.N; i++ {
p.Run(&event)

Check failure on line 189 in libbeat/processors/ratelimit/rate_limit_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `p.Run` is not checked (errcheck)
}
}
51 changes: 29 additions & 22 deletions libbeat/processors/ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,36 @@ func init() {
}

type bucket struct {
mu sync.Mutex

tokens float64
lastReplenish time.Time
}

func (b *bucket) withdraw() bool {
b.mu.Lock()
defer b.mu.Unlock()

if b.tokens < 1 {
return false
}

b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 {
b.mu.Lock()
defer b.mu.Unlock()

secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
return b.tokens
}

type tokenBucket struct {
mu unison.Mutex

Expand Down Expand Up @@ -128,31 +154,13 @@ func (t *tokenBucket) getBucket(key uint64) *bucket {
})
//nolint:errcheck // ignore
b := v.(*bucket)

if exists {
b.replenish(t.limit, t.clock)
return b
}

return b
}

func (b *bucket) withdraw() bool {
if b.tokens < 1 {
return false
}
b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) {
secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
}

func (t *tokenBucket) runGC() {
// Don't run GC if thresholds haven't been crossed.
if t.gc.metrics.numCalls.Load() < uint64(t.gc.thresholds.NumCalls) {
Expand All @@ -177,9 +185,8 @@ func (t *tokenBucket) runGC() {
//nolint:errcheck // ignore
b := v.(*bucket)

b.replenish(t.limit, t.clock)

if b.tokens >= t.depth {
tokens := b.replenish(t.limit, t.clock)
if tokens >= t.depth {
toDelete = append(toDelete, key)
}

Expand All @@ -193,7 +200,7 @@ func (t *tokenBucket) runGC() {
}

// Reset GC metrics
t.gc.metrics.numCalls = atomic.Uint64{}
t.gc.metrics.numCalls.Store(0)

gcDuration := time.Since(gcStartTime)
numBucketsDeleted := len(toDelete)
Expand Down

0 comments on commit 92dac6e

Please sign in to comment.