diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 451b2ff6f4bc..713fd48da43c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -67,6 +67,7 @@ Dropped fields: `syslog.priority` and `syslog.facility` while keeping their dupl already present as `container.id` and `container.log.tag` is dropped because it is already present as `log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`, otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] +- Fixed race conditions in the global ratelimit processor that could drop events or apply rate limiting incorrectly. *Heartbeat* diff --git a/libbeat/processors/ratelimit/rate_limit_test.go b/libbeat/processors/ratelimit/rate_limit_test.go index f2f3dd2ffcf4..8a739fb22f15 100644 --- a/libbeat/processors/ratelimit/rate_limit_test.go +++ b/libbeat/processors/ratelimit/rate_limit_test.go @@ -176,3 +176,16 @@ func TestRateLimit(t *testing.T) { }) } } + +func BenchmarkRateLimit(b *testing.B) { + p, err := new(conf.MustNewConfigFrom(mapstr.M{ + "limit": "100/s", + })) + require.NoError(b, err) + event := beat.Event{Fields: mapstr.M{"field": 1}} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + p.Run(&event) //nolint:errcheck // ignore + } +} diff --git a/libbeat/processors/ratelimit/token_bucket.go b/libbeat/processors/ratelimit/token_bucket.go index 1e84f799b986..78a4e1d6b498 100644 --- a/libbeat/processors/ratelimit/token_bucket.go +++ b/libbeat/processors/ratelimit/token_bucket.go @@ -35,10 +35,36 @@ func init() { } type bucket struct { + mu sync.Mutex + tokens float64 lastReplenish time.Time } +func (b *bucket) withdraw() bool { + b.mu.Lock() + defer b.mu.Unlock() + + if b.tokens < 1 { + return false + } + + b.tokens-- + return true +} + +func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 { + b.mu.Lock() + defer b.mu.Unlock() + + secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() + tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() + + b.tokens += tokensToReplenish + b.lastReplenish = clock.Now() + return b.tokens +} + type tokenBucket struct { mu unison.Mutex @@ -122,35 +148,20 @@ func (t *tokenBucket) setClock(c clockwork.Clock) { } func (t *tokenBucket) getBucket(key uint64) *bucket { - v, exists := t.buckets.LoadOrStore(key, &bucket{ - tokens: t.depth, - lastReplenish: t.clock.Now(), - }) - //nolint:errcheck // ignore - b := v.(*bucket) - + v, exists := t.buckets.Load(key) if exists { + //nolint:errcheck // ignore + b := v.(*bucket) b.replenish(t.limit, t.clock) return b } - return b -} - -func (b *bucket) withdraw() bool { - if b.tokens < 1 { - return false - } - b.tokens-- - return true -} - -func (b *bucket) replenish(rate rate, clock clockwork.Clock) { - secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() - tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() - - b.tokens += tokensToReplenish - b.lastReplenish = clock.Now() + v, _ = t.buckets.LoadOrStore(key, &bucket{ + tokens: t.depth, + lastReplenish: t.clock.Now(), + }) + //nolint:errcheck // ignore + return v.(*bucket) } func (t *tokenBucket) runGC() { @@ -177,9 +188,8 @@ func (t *tokenBucket) runGC() { //nolint:errcheck // ignore b := v.(*bucket) - b.replenish(t.limit, t.clock) - - if b.tokens >= t.depth { + tokens := b.replenish(t.limit, t.clock) + if tokens >= t.depth { toDelete = append(toDelete, key) } @@ -193,7 +203,7 @@ func (t *tokenBucket) runGC() { } // Reset GC metrics - t.gc.metrics.numCalls = atomic.Uint64{} + t.gc.metrics.numCalls.Store(0) gcDuration := time.Since(gcStartTime) numBucketsDeleted := len(toDelete)