Skip to content

Commit

Permalink
fix(libbeat): mitigate race condition in ratelimit processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mauri870 committed Feb 28, 2025
1 parent c0fdd97 commit 2a24573
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: fix race conditions in ratelimit processor that could cause misbehavior

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: This could cause events to be dropped or not rate-limited as expected when using a global processor and multiple inputs.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/beats/42966

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
13 changes: 13 additions & 0 deletions libbeat/processors/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,16 @@ func TestRateLimit(t *testing.T) {
})
}
}

func BenchmarkRateLimit(b *testing.B) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(b, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

b.ResetTimer()
for i := 0; i < b.N; i++ {
p.Run(&event)

Check failure on line 189 in libbeat/processors/ratelimit/rate_limit_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `p.Run` is not checked (errcheck)
}
}
51 changes: 29 additions & 22 deletions libbeat/processors/ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,36 @@ func init() {
}

type bucket struct {
mu sync.Mutex

tokens float64
lastReplenish time.Time
}

func (b *bucket) withdraw() bool {
b.mu.Lock()
defer b.mu.Unlock()

if b.tokens < 1 {
return false
}

b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 {
b.mu.Lock()
defer b.mu.Unlock()

secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
return b.tokens
}

type tokenBucket struct {
mu unison.Mutex

Expand Down Expand Up @@ -128,31 +154,13 @@ func (t *tokenBucket) getBucket(key uint64) *bucket {
})
//nolint:errcheck // ignore
b := v.(*bucket)

if exists {
b.replenish(t.limit, t.clock)
return b
}

return b
}

func (b *bucket) withdraw() bool {
if b.tokens < 1 {
return false
}
b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) {
secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
}

func (t *tokenBucket) runGC() {
// Don't run GC if thresholds haven't been crossed.
if t.gc.metrics.numCalls.Load() < uint64(t.gc.thresholds.NumCalls) {
Expand All @@ -177,9 +185,8 @@ func (t *tokenBucket) runGC() {
//nolint:errcheck // ignore
b := v.(*bucket)

b.replenish(t.limit, t.clock)

if b.tokens >= t.depth {
tokens := b.replenish(t.limit, t.clock)
if tokens >= t.depth {
toDelete = append(toDelete, key)
}

Expand All @@ -193,7 +200,7 @@ func (t *tokenBucket) runGC() {
}

// Reset GC metrics
t.gc.metrics.numCalls = atomic.Uint64{}
t.gc.metrics.numCalls.Store(0)

gcDuration := time.Since(gcStartTime)
numBucketsDeleted := len(toDelete)
Expand Down

0 comments on commit 2a24573

Please sign in to comment.