diff --git a/libbeat/processors/ratelimit/rate_limit_test.go b/libbeat/processors/ratelimit/rate_limit_test.go index f2f3dd2ffcf4..001512d75a08 100644 --- a/libbeat/processors/ratelimit/rate_limit_test.go +++ b/libbeat/processors/ratelimit/rate_limit_test.go @@ -176,3 +176,16 @@ func TestRateLimit(t *testing.T) { }) } } + +func BenchmarkRateLimit(b *testing.B) { + p, err := new(conf.MustNewConfigFrom(mapstr.M{ + "limit": "100/s", + })) + require.NoError(b, err) + event := beat.Event{Fields: mapstr.M{"field": 1}} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + p.Run(&event) + } +} diff --git a/libbeat/processors/ratelimit/token_bucket.go b/libbeat/processors/ratelimit/token_bucket.go index 1e84f799b986..f42e51d4a850 100644 --- a/libbeat/processors/ratelimit/token_bucket.go +++ b/libbeat/processors/ratelimit/token_bucket.go @@ -35,10 +35,36 @@ func init() { } type bucket struct { + mu sync.Mutex + tokens float64 lastReplenish time.Time } +func (b *bucket) withdraw() bool { + b.mu.Lock() + defer b.mu.Unlock() + + if b.tokens < 1 { + return false + } + + b.tokens-- + return true +} + +func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 { + b.mu.Lock() + defer b.mu.Unlock() + + secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() + tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() + + b.tokens += tokensToReplenish + b.lastReplenish = clock.Now() + return b.tokens +} + type tokenBucket struct { mu unison.Mutex @@ -128,31 +154,13 @@ func (t *tokenBucket) getBucket(key uint64) *bucket { }) //nolint:errcheck // ignore b := v.(*bucket) - if exists { b.replenish(t.limit, t.clock) - return b } return b } -func (b *bucket) withdraw() bool { - if b.tokens < 1 { - return false - } - b.tokens-- - return true -} - -func (b *bucket) replenish(rate rate, clock clockwork.Clock) { - secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds() - tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond() - - b.tokens += tokensToReplenish - b.lastReplenish = clock.Now() -} - func (t *tokenBucket) runGC() { // Don't run GC if thresholds haven't been crossed. if t.gc.metrics.numCalls.Load() < uint64(t.gc.thresholds.NumCalls) { @@ -177,9 +185,8 @@ func (t *tokenBucket) runGC() { //nolint:errcheck // ignore b := v.(*bucket) - b.replenish(t.limit, t.clock) - - if b.tokens >= t.depth { + tokens := b.replenish(t.limit, t.clock) + if tokens >= t.depth { toDelete = append(toDelete, key) } @@ -193,7 +200,7 @@ func (t *tokenBucket) runGC() { } // Reset GC metrics - t.gc.metrics.numCalls = atomic.Uint64{} + t.gc.metrics.numCalls.Store(0) gcDuration := time.Since(gcStartTime) numBucketsDeleted := len(toDelete)