-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathratelimiter.go
134 lines (111 loc) · 4.09 KB
/
ratelimiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package qpool
import (
"sync"
"time"
)
/*
RateLimiter implements the Regulator interface using a token bucket algorithm.
It controls the rate of operations by maintaining a bucket of tokens that are consumed
by each operation and replenished at a fixed rate.
Like a water tank with a steady inflow and controlled outflow, this regulator ensures
that operations occur at a sustainable rate, preventing system overload while allowing
for brief bursts of activity when the token bucket is full.
Key features:
- Smooth rate limiting with burst capacity
- Configurable token replenishment rate
- Thread-safe operation
- Metric-aware for adaptive rate limiting
*/
type RateLimiter struct {
tokens int // Current number of available tokens
maxTokens int // Maximum token capacity
refillRate time.Duration // Time between token replenishments
lastRefill time.Time // Last time tokens were added
mu sync.Mutex // Ensures thread-safe access to tokens
metrics *Metrics // System metrics for adaptive behavior
}
/*
NewRateLimiter creates a new rate limit regulator with specified parameters.
Parameters:
- maxTokens: Maximum number of tokens (burst capacity)
- refillRate: Duration between token replenishments
Returns:
- *RateLimiter: A new rate limit regulator instance
Example:
limiter := NewRateLimiter(100, time.Second) // 100 ops/second with burst capacity
*/
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
now := time.Now()
return &RateLimiter{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefill: now.Add(-refillRate), // Start with a full refill period elapsed
}
}
/*
Observe implements the Regulator interface by monitoring system metrics.
The rate limiter can use these metrics to dynamically adjust its rate limits
based on system conditions.
For example, it might:
- Reduce rates during high system load
- Increase limits when resources are abundant
- Adjust burst capacity based on queue length
Parameters:
- metrics: Current system metrics including performance and health indicators
*/
func (rl *RateLimiter) Observe(metrics *Metrics) {
rl.metrics = metrics
}
/*
Limit implements the Regulator interface by determining if an operation should be limited.
It consumes a token if available, allowing the operation to proceed. If no tokens
are available, the operation is limited.
Returns:
- bool: true if the operation should be limited, false if it can proceed
Thread-safety: This method is thread-safe through mutex protection.
*/
func (rl *RateLimiter) Limit() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
rl.refill()
if rl.tokens > 0 {
rl.tokens--
return false // Don't limit
}
return true // Limit
}
/*
Renormalize implements the Regulator interface by attempting to restore normal operation.
This method triggers a token refill, potentially allowing more operations to proceed
if enough time has passed since the last refill.
The rate limiter uses this method to maintain a steady flow of operations while
adhering to the configured rate limits.
*/
func (rl *RateLimiter) Renormalize() {
rl.mu.Lock()
defer rl.mu.Unlock()
rl.refill()
}
/*
refill adds tokens to the bucket based on elapsed time.
This is an internal method that implements the token bucket algorithm's
replenishment logic.
The number of tokens added is proportional to the time elapsed since the last
refill, up to the maximum capacity of the bucket.
Thread-safety: This method assumes the caller holds the mutex lock.
*/
func (rl *RateLimiter) refill() {
now := time.Now()
elapsed := now.Sub(rl.lastRefill)
// Convert to nanoseconds for integer division
elapsedNs := elapsed.Nanoseconds()
refillRateNs := rl.refillRate.Nanoseconds()
// Calculate tokens to add - only round up if we're at least halfway through a period
tokensToAdd := (elapsedNs + (refillRateNs / 2)) / refillRateNs
if tokensToAdd > 0 {
rl.tokens = Min(rl.maxTokens, rl.tokens+int(tokensToAdd))
// Only move lastRefill forward by the number of complete periods
rl.lastRefill = rl.lastRefill.Add(time.Duration(tokensToAdd) * rl.refillRate)
}
}