From 31eb2120bf87ca5fc035b808eb9023b41d9bfa3f Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 28 Feb 2025 10:11:33 -0300 Subject: [PATCH] fix(libbeat): mitigate race condition in ratelimit processor --- ...ce-conditions-in-rate-limit-processor.yaml | 32 ++++++++++++ .../processors/ratelimit/rate_limit_test.go | 13 +++++ libbeat/processors/ratelimit/token_bucket.go | 51 +++++++++++-------- 3 files changed, 74 insertions(+), 22 deletions(-) create mode 100644 changelog/fragments/1740762770-fix-race-conditions-in-rate-limit-processor.yaml diff --git a/changelog/fragments/1740762770-fix-race-conditions-in-rate-limit-processor.yaml b/changelog/fragments/1740762770-fix-race-conditions-in-rate-limit-processor.yaml new file mode 100644 index 000000000000..200ef3ff579a --- /dev/null +++ b/changelog/fragments/1740762770-fix-race-conditions-in-rate-limit-processor.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: fix race condition in rate limit processor that could cause misbehavior + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/beats/42966 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/libbeat/processors/ratelimit/rate_limit_test.go b/libbeat/processors/ratelimit/rate_limit_test.go index f2f3dd2ffcf4..001512d75a08 100644 --- a/libbeat/processors/ratelimit/rate_limit_test.go +++ b/libbeat/processors/ratelimit/rate_limit_test.go @@ -176,3 +176,16 @@ func TestRateLimit(t *testing.T) { }) } } + +func BenchmarkRateLimit(b *testing.B) { + p, err := new(conf.MustNewConfigFrom(mapstr.M{ + "limit": "100/s", + })) + require.NoError(b, err) + event := beat.Event{Fields: mapstr.M{"field": 1}} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + p.Run(&event) + } +} diff --git a/libbeat/processors/ratelimit/token_bucket.go b/libbeat/processors/ratelimit/token_bucket.go index 1e84f799b986..f42e51d4a850 100644 --- a/libbeat/processors/ratelimit/token_bucket.go +++ b/libbeat/processors/ratelimit/token_bucket.go @@ -35,10 +35,36 @@ func init() { } type bucket struct { + mu sync.Mutex + tokens float64 lastReplenish time.Time } +func (b *bucket) withdraw() bool { + b.mu.Lock() + defer b.mu.Unlock() + + if b.tokens < 1 { + return false + } + + b.tokens-- + return true +} + +func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 { + b.mu.Lock() + defer b.mu.Unlock() + + secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() + tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() + + b.tokens += tokensToReplenish + b.lastReplenish = clock.Now() + return b.tokens +} + type tokenBucket struct { mu unison.Mutex @@ -128,31 +154,13 @@ func (t *tokenBucket) getBucket(key uint64) *bucket { }) //nolint:errcheck // ignore b := v.(*bucket) - if exists { b.replenish(t.limit, t.clock) - return b } return b } -func (b *bucket) withdraw() bool { - if b.tokens < 1 { - return false - } - b.tokens-- - return true -} - -func (b *bucket) replenish(rate rate, clock clockwork.Clock) { - secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() - tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() - - b.tokens += tokensToReplenish - b.lastReplenish = clock.Now() -} - func (t *tokenBucket) runGC() { // Don't run GC if thresholds haven't been crossed. if t.gc.metrics.numCalls.Load() < uint64(t.gc.thresholds.NumCalls) { @@ -177,9 +185,8 @@ func (t *tokenBucket) runGC() { //nolint:errcheck // ignore b := v.(*bucket) - b.replenish(t.limit, t.clock) - - if b.tokens >= t.depth { + tokens := b.replenish(t.limit, t.clock) + if tokens >= t.depth { toDelete = append(toDelete, key) } @@ -193,7 +200,7 @@ func (t *tokenBucket) runGC() { } // Reset GC metrics - t.gc.metrics.numCalls = atomic.Uint64{} + t.gc.metrics.numCalls.Store(0) gcDuration := time.Since(gcStartTime) numBucketsDeleted := len(toDelete)