-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy path00-redis-pool.go
169 lines (152 loc) · 3.87 KB
/
00-redis-pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package redispool
import (
"sync/atomic"
"time"
"github.com/fzzy/radix/redis"
)
// A simple connection pool. It will create a small pool of initial connections,
// and if more connections are needed they will be created on demand. If a
// connection is returned and the pool is full it will be closed.
type RedisPool struct {
pwd string
network string
addr string
pool chan *redis.Client
qpsLimit int32
qps int32
}
const (
DEFAULT_QPS_LIMIT = 2000
)
func (p *RedisPool) getRedisClient() (*redis.Client, error) {
client, err := redis.Dial(p.network, p.addr)
if err != nil {
return nil, err
}
if len(p.pwd) != 0 {
if client, err = p.redisAuth(client); err != nil {
return nil, err
}
}
return client, nil
}
func (p *RedisPool) redisAuth(client *redis.Client) (*redis.Client, error) {
if _, err := client.Cmd("AUTH", p.pwd).Str(); err != nil {
client.Close()
return client, err
}
return client, nil
}
// Creates a new Pool whose connections are all created using
// redis.Dial(network, addr). The size indicates the maximum number of idle
// connections to have waiting to be used at any given moment
func NewPool(network, addr, password string, size int) (*RedisPool, error) {
if size <= 0 {
size = 1
}
rp := RedisPool{
network: network,
addr: addr,
pool: make(chan *redis.Client, size),
pwd: password,
qpsLimit: int32(DEFAULT_QPS_LIMIT),
qps: int32(0),
}
client_list := make([]*redis.Client, 0, size)
for i := 0; i < size; i++ {
client, err := rp.getRedisClient()
if err != nil {
for _, client = range client_list {
client.Close()
}
return nil, err
}
if client != nil {
client_list = append(client_list, client)
}
}
for i := range client_list {
rp.pool <- client_list[i]
}
return &rp, nil
}
// Calls NewPool, but if there is an error it return a pool of the same size but
// without any connections pre-initialized (can be used the same way, but if
// this happens there might be something wrong with the redis instance you're
// connecting to)
func NewOrEmptyPool(network, addr, password string, size int) *RedisPool {
pool, err := NewPool(network, addr, password, size)
if err != nil {
if size <= 0 {
size = 1
}
pool = &RedisPool{
network: network,
addr: addr,
pool: make(chan *redis.Client, size),
pwd: password,
qpsLimit: int32(DEFAULT_QPS_LIMIT),
qps: int32(0),
}
}
return pool
}
func (p *RedisPool) get() (client *redis.Client, err error) {
select {
case conn := <-p.pool:
return conn, nil
default:
return p.getRedisClient()
}
}
// Retrieves an available redis client. If there are none available it will
// create a new one on the fly
func (p *RedisPool) Get() (client *redis.Client, err error) {
if p.qpsLimit > 0 {
for p.qps > p.qpsLimit {
time.Sleep(time.Millisecond * time.Duration(10))
}
}
for i := 0; i < 3; i++ {
if client, err = p.get(); err != nil {
break
} else if pstate, perr := client.Cmd("PING").Str(); pstate == "PONG" && perr == nil {
break
}
}
if err == nil {
atomic.AddInt32(&p.qps, 1)
}
return
}
// Returns a client back to the pool. If the pool is full the client is closed
// instead. If the client is already closed (due to connection failure or
// what-have-you) it should not be put back in the pool. The pool will create
// more connections as needed.
func (p *RedisPool) Put(conn *redis.Client) {
select {
case p.pool <- conn:
default:
conn.Close()
}
if p.qps > 0 {
atomic.AddInt32(&p.qps, -1)
}
}
// Removes and calls Close() on all the connections currently in the pool.
// Assuming there are no other connections waiting to be Put back this method
// effectively closes and cleans up the pool.
func (p *RedisPool) Empty() {
var conn *redis.Client
for {
select {
case conn = <-p.pool:
conn.Close()
default:
return
}
if p.qps > 0 {
atomic.AddInt32(&p.qps, -1)
}
}
}