Skip to content

Commit

Permalink
Redis Cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Grondin committed Aug 20, 2018
1 parent d8aef84 commit e7d9fe4
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 111 deletions.
35 changes: 20 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,13 @@ Clustering lets many limiters access the same shared state, stored in a Redis se

##### Enabling Clustering

__IMPORTANT:__ Add `redis` to your application's dependencies.
```
__IMPORTANT:__ Add `redis` or `ioredis` to your application's dependencies.
```bash
# To use https://github.com/NodeRedis/node_redis
npm install --save redis

# To use https://github.com/luin/ioredis
npm install --save ioredis
```

```js
Expand All @@ -485,11 +489,13 @@ const limiter = new Bottleneck({
id: "my-super-app" // Should be unique for every limiter in the same Redis db

/* Clustering options */
datastore: "redis",
datastore: "redis", // or "ioredis"
clearDatastore: false,
clientOptions: {
// node-redis client options, passed to redis.createClient()
// See https://github.com/NodeRedis/node_redis#options-object-properties
// Redis client options
// For NodeRedis, see https://github.com/NodeRedis/node_redis#options-object-properties
// For ioredis, see https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options

host: "127.0.0.1",
port: 6379
// "db" is another useful option
Expand All @@ -499,9 +505,10 @@ const limiter = new Bottleneck({

| Option | Default | Description |
|--------|---------|-------------|
| `datastore` | `"local"` | Where the limiter stores its internal state. The default (`local`) keeps the state in the limiter itself. Set it to `redis` to enable Clustering. |
| `datastore` | `"local"` | Where the limiter stores its internal state. The default (`"local"`) keeps the state in the limiter itself. Set it to `"redis"` or `"ioredis"` to enable Clustering. |
| `clearDatastore` | `false` | When set to `true`, on initial startup, the limiter will wipe any existing Bottleneck state data on the Redis db. |
| `clientOptions` | `{}` | This object is passed directly to NodeRedis's `redis.createClient()` method. [See all the valid client options.](https://github.com/NodeRedis/node_redis#options-object-properties) |
| `clientOptions` | `{}` | This object is passed directly to the redis client library you've selected. |
| `clusterNodes` | `null` | **ioredis only.** When `clusterNodes` is not null, the client will be instantiated by calling `new Redis.Cluster(clusterNodes, clientOptions)` instead of `new Redis(clientOptions)`. |
| `timeout` | `null` | The Redis TTL in milliseconds ([TTL](https://redis.io/commands/ttl)) for the keys created by the limiter. When `timeout` is set, the limiter's state will be automatically removed from Redis after `timeout` milliseconds of inactivity. **Note:** `timeout` is `300000` (5 minutes) by default when using a Group. |

###### `.ready()`
Expand All @@ -520,11 +527,11 @@ limiter.ready()
})
```

The `.ready()` method also exists when using the `local` datastore, for code compatibility reasons: code written for `redis` will always work with `local`.
The `.ready()` method also exists when using the `local` datastore, for code compatibility reasons: code written for `redis`/`ioredis` will also work with `local`.

###### `.disconnect(flush)`

This helper method calls the `.end(flush)` method on the Redis clients used by a limiter.
This helper method disconnects the limiter's client from the Redis server.

```js
limiter.disconnect(true)
Expand Down Expand Up @@ -591,8 +598,8 @@ clusterLimiter.ready()
- As of v2.7.0, each Group will create 2 connections to Redis, one for commands and one for pub/sub. All limiters within the Group will share those connections.
- Each standalone limiter has its own 2 connections.
- Redis connectivity errors trigger an `error` event on the owner of the connection (the Group or the limiter).
- Bottleneck itself is compatible with [Redis Clusters](https://redis.io/topics/cluster-tutorial), but `NodeRedis` may not support it at the moment. Future versions of Bottleneck will support the `ioredis` driver.
- Bottleneck's data is stored in Redis keys starting with `b_`. It also uses pub/sub channels starting with `b_` It will not interfere with any other data stored on the server.
- Bottleneck is compatible with [Redis Clusters](https://redis.io/topics/cluster-tutorial) and Redis Sentinel, but you must use the `ioredis` datastore and pass the `clusterNodes` option.
- Bottleneck's data is stored in Redis keys starting with `b_`. It also uses pub/sub channels starting with `bottleneck_` It will not interfere with any other data stored on the server.
- Bottleneck loads a few Lua scripts on the Redis server using the `SCRIPT LOAD` command. These scripts only take up a few Kb of memory. Running the `SCRIPT FLUSH` command will cause any connected limiters to experience critical errors until a new limiter connects to Redis and loads the scripts again.
- The Lua scripts are highly optimized and designed to use as few resources (CPU, especially) as possible.

Expand Down Expand Up @@ -753,13 +760,11 @@ This README is always in need of improvements. If wording can be clearer and sim

Suggestions and bug reports are also welcome.

To work on the Bottleneck code, simply clone the repo, and run `./scripts/build.sh && npm test` to ensure that everything is set up correctly.

Make your changes to the files located in `src/` only, then run `./scripts/build.sh && npm test` to build and test them.
To work on the Bottleneck code, simply clone the repo, makes your changes to the files located in `src/` only, then run `./scripts/build.sh && npm test` to ensure that everything is set up correctly.

To speed up compilation time during development, run `./scripts/build.sh dev` instead. Make sure to build and test without `dev` before submitting a PR.

The tests must also pass in Clustering mode. You'll need a Redis server running on `127.0.0.1:6379`, then run `./scripts/build.sh && DATASTORE=redis npm test`.
The tests must also pass in Clustering mode. You'll need a Redis server running on `127.0.0.1:6379`, then run `./scripts/build.sh && DATASTORE=redis npm test && DATASTORE=ioredis npm test`.

All contributions are appreciated and will be considered.

Expand Down
6 changes: 5 additions & 1 deletion bottleneck.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ declare module "bottleneck" {
*/
readonly Promise?: any;
/**
* This object is passed directly to NodeRedis's `redis.createClient()` method.
* This object is passed directly to the redis client library you've selected.
*/
readonly clientOptions?: any;
/**
* **ioredis only.** When `clusterNodes` is not null, the client will be instantiated by calling `new Redis.Cluster(clusterNodes, clientOptions)`.
*/
readonly clusterNodes?: any;
/**
* When set to `true`, on initial startup, the limiter will wipe any existing Bottleneck state data on the Redis db.
*/
Expand Down
6 changes: 5 additions & 1 deletion bottleneck.d.ts.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ declare module "bottleneck" {
*/
readonly Promise?: any;
/**
* This object is passed directly to NodeRedis's `redis.createClient()` method.
* This object is passed directly to the redis client library you've selected.
*/
readonly clientOptions?: any;
/**
* **ioredis only.** When `clusterNodes` is not null, the client will be instantiated by calling `new Redis.Cluster(clusterNodes, clientOptions)`.
*/
readonly clusterNodes?: any;
/**
* When set to `true`, on initial startup, the limiter will wipe any existing Bottleneck state data on the Redis db.
*/
Expand Down
46 changes: 30 additions & 16 deletions bottleneck.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
var _this = this;

return _asyncToGenerator(function* () {
return yield _this._store.__disconnect__(flush);
yield _this._store.__disconnect__(flush);
return _this;
})();
}

Expand Down Expand Up @@ -580,6 +581,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a

Bottleneck.prototype.storeInstanceDefaults = {
clientOptions: {},
clusterNodes: null,
clearDatastore: false,
Promise: Promise,
timeout: null,
Expand Down Expand Up @@ -780,7 +782,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
Group = function () {
class Group {
constructor(limiterOptions = {}) {
var ref, ref1, ref2, ref3;
var ref, ref1, ref2, ref3, ref4;
this.key = this.key.bind(this);
this.deleteKey = this.deleteKey.bind(this);
this.limiters = this.limiters.bind(this);
Expand All @@ -796,7 +798,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
if (this.limiterOptions.datastore === "redis") {
this._connection = new RedisConnection((ref = this.limiterOptions.clientOptions) != null ? ref : {}, (ref1 = this.limiterOptions.Promise) != null ? ref1 : Promise, this.Events);
} else if (this.limiterOptions.datastore === "ioredis") {
this._connection = new IORedisConnection((ref2 = this.limiterOptions.clientOptions) != null ? ref2 : {}, (ref3 = this.limiterOptions.Promise) != null ? ref3 : Promise, this.Events);
this._connection = new IORedisConnection((ref2 = this.limiterOptions.clusterNodes) != null ? ref2 : null, (ref3 = this.limiterOptions.clientOptions) != null ? ref3 : {}, (ref4 = this.limiterOptions.Promise) != null ? ref4 : Promise, this.Events);
}
}

Expand Down Expand Up @@ -901,14 +903,20 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
Scripts = require("./Scripts");

IORedisConnection = class IORedisConnection {
constructor(clientOptions, Promise, Events) {
constructor(clusterNodes, clientOptions, Promise, Events) {
var Redis;
this.clusterNodes = clusterNodes;
this.clientOptions = clientOptions;
this.Promise = Promise;
this.Events = Events;
Redis = eval("require")("ioredis"); // Obfuscated or else Webpack/Angular will try to inline the optional ioredis module
this.client = new Redis(this.clientOptions);
this.subClient = new Redis(this.clientOptions);
if (this.clusterNodes != null) {
this.client = new Redis.Cluster(this.clusterNodes, this.clientOptions);
this.subClient = new Redis.Cluster(this.clusterNodes, this.clientOptions);
} else {
this.client = new Redis(this.clientOptions);
this.subClient = new Redis(this.clientOptions);
}
this.pubsubs = {};
this.ready = new this.Promise((resolve, reject) => {
var count, done, errorListener;
Expand Down Expand Up @@ -981,8 +989,13 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
}

disconnect(flush) {
this.client.end(flush);
return this.subClient.end(flush);
if (flush) {
return this.Promise.all([this.client.quit(), this.subClient.quit()]);
} else {
this.client.disconnect();
this.subClient.disconnect();
return this.Promise.resolve();
}
}

};
Expand Down Expand Up @@ -1016,7 +1029,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
}

__disconnect__(flush) {
return this;
return this.Promise.resolve();
}

yieldLoop(t = 0) {
Expand Down Expand Up @@ -1194,13 +1207,13 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a

RedisConnection = class RedisConnection {
constructor(clientOptions, Promise, Events) {
var redis;
var Redis;
this.clientOptions = clientOptions;
this.Promise = Promise;
this.Events = Events;
redis = eval("require")("redis"); // Obfuscated or else Webpack/Angular will try to inline the optional redis module
this.client = redis.createClient(this.clientOptions);
this.subClient = redis.createClient(this.clientOptions);
Redis = eval("require")("redis"); // Obfuscated or else Webpack/Angular will try to inline the optional redis module
this.client = Redis.createClient(this.clientOptions);
this.subClient = Redis.createClient(this.clientOptions);
this.pubsubs = {};
this.shas = {};
this.ready = new this.Promise((resolve, reject) => {
Expand Down Expand Up @@ -1285,7 +1298,8 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a

disconnect(flush) {
this.client.end(flush);
return this.subClient.end(flush);
this.subClient.end(flush);
return this.Promise.resolve();
}

};
Expand Down Expand Up @@ -1320,11 +1334,11 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
this.originalId = this.instance.id;
parser.load(options, options, this);
this.isReady = false;
this.connection = this._groupConnection ? this._groupConnection : this.instance.datastore === "redis" ? new RedisConnection(this.clientOptions, this.Promise, this.instance.Events) : this.instance.datastore === "ioredis" ? new IORedisConnection(this.clientOptions, this.Promise, this.instance.Events) : void 0;
this.connection = this._groupConnection ? this._groupConnection : this.instance.datastore === "redis" ? new RedisConnection(this.clientOptions, this.Promise, this.instance.Events) : this.instance.datastore === "ioredis" ? new IORedisConnection(this.clusterNodes, this.clientOptions, this.Promise, this.instance.Events) : void 0;
this.ready = this.connection.ready.then(clients => {
var args;
this.clients = clients;
args = this.prepareInitSettings(options.clearDatastore);
args = this.prepareInitSettings(this.clearDatastore);
this.isReady = true;
return this.runScript("init", args);
}).then(() => {
Expand Down
2 changes: 1 addition & 1 deletion bottleneck.min.js

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion lib/Bottleneck.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
var _this = this;

return _asyncToGenerator(function* () {
return yield _this._store.__disconnect__(flush);
yield _this._store.__disconnect__(flush);
return _this;
})();
}

Expand Down Expand Up @@ -579,6 +580,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a

Bottleneck.prototype.storeInstanceDefaults = {
clientOptions: {},
clusterNodes: null,
clearDatastore: false,
Promise: Promise,
timeout: null,
Expand Down
4 changes: 2 additions & 2 deletions lib/Group.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
Group = function () {
class Group {
constructor(limiterOptions = {}) {
var ref, ref1, ref2, ref3;
var ref, ref1, ref2, ref3, ref4;
this.key = this.key.bind(this);
this.deleteKey = this.deleteKey.bind(this);
this.limiters = this.limiters.bind(this);
Expand All @@ -33,7 +33,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
if (this.limiterOptions.datastore === "redis") {
this._connection = new RedisConnection((ref = this.limiterOptions.clientOptions) != null ? ref : {}, (ref1 = this.limiterOptions.Promise) != null ? ref1 : Promise, this.Events);
} else if (this.limiterOptions.datastore === "ioredis") {
this._connection = new IORedisConnection((ref2 = this.limiterOptions.clientOptions) != null ? ref2 : {}, (ref3 = this.limiterOptions.Promise) != null ? ref3 : Promise, this.Events);
this._connection = new IORedisConnection((ref2 = this.limiterOptions.clusterNodes) != null ? ref2 : null, (ref3 = this.limiterOptions.clientOptions) != null ? ref3 : {}, (ref4 = this.limiterOptions.Promise) != null ? ref4 : Promise, this.Events);
}
}

Expand Down
21 changes: 16 additions & 5 deletions lib/IORedisConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@
Scripts = require("./Scripts");

IORedisConnection = class IORedisConnection {
constructor(clientOptions, Promise, Events) {
constructor(clusterNodes, clientOptions, Promise, Events) {
var Redis;
this.clusterNodes = clusterNodes;
this.clientOptions = clientOptions;
this.Promise = Promise;
this.Events = Events;
Redis = eval("require")("ioredis"); // Obfuscated or else Webpack/Angular will try to inline the optional ioredis module
this.client = new Redis(this.clientOptions);
this.subClient = new Redis(this.clientOptions);
if (this.clusterNodes != null) {
this.client = new Redis.Cluster(this.clusterNodes, this.clientOptions);
this.subClient = new Redis.Cluster(this.clusterNodes, this.clientOptions);
} else {
this.client = new Redis(this.clientOptions);
this.subClient = new Redis(this.clientOptions);
}
this.pubsubs = {};
this.ready = new this.Promise((resolve, reject) => {
var count, done, errorListener;
Expand Down Expand Up @@ -87,8 +93,13 @@
}

disconnect(flush) {
this.client.end(flush);
return this.subClient.end(flush);
if (flush) {
return this.Promise.all([this.client.quit(), this.subClient.quit()]);
} else {
this.client.disconnect();
this.subClient.disconnect();
return this.Promise.resolve();
}
}

};
Expand Down
2 changes: 1 addition & 1 deletion lib/LocalDatastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
}

__disconnect__(flush) {
return this;
return this.Promise.resolve();
}

yieldLoop(t = 0) {
Expand Down
11 changes: 6 additions & 5 deletions lib/RedisConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

RedisConnection = class RedisConnection {
constructor(clientOptions, Promise, Events) {
var redis;
var Redis;
this.clientOptions = clientOptions;
this.Promise = Promise;
this.Events = Events;
redis = eval("require")("redis"); // Obfuscated or else Webpack/Angular will try to inline the optional redis module
this.client = redis.createClient(this.clientOptions);
this.subClient = redis.createClient(this.clientOptions);
Redis = eval("require")("redis"); // Obfuscated or else Webpack/Angular will try to inline the optional redis module
this.client = Redis.createClient(this.clientOptions);
this.subClient = Redis.createClient(this.clientOptions);
this.pubsubs = {};
this.shas = {};
this.ready = new this.Promise((resolve, reject) => {
Expand Down Expand Up @@ -99,7 +99,8 @@

disconnect(flush) {
this.client.end(flush);
return this.subClient.end(flush);
this.subClient.end(flush);
return this.Promise.resolve();
}

};
Expand Down
4 changes: 2 additions & 2 deletions lib/RedisDatastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a
this.originalId = this.instance.id;
parser.load(options, options, this);
this.isReady = false;
this.connection = this._groupConnection ? this._groupConnection : this.instance.datastore === "redis" ? new RedisConnection(this.clientOptions, this.Promise, this.instance.Events) : this.instance.datastore === "ioredis" ? new IORedisConnection(this.clientOptions, this.Promise, this.instance.Events) : void 0;
this.connection = this._groupConnection ? this._groupConnection : this.instance.datastore === "redis" ? new RedisConnection(this.clientOptions, this.Promise, this.instance.Events) : this.instance.datastore === "ioredis" ? new IORedisConnection(this.clusterNodes, this.clientOptions, this.Promise, this.instance.Events) : void 0;
this.ready = this.connection.ready.then(clients => {
var args;
this.clients = clients;
args = this.prepareInitSettings(options.clearDatastore);
args = this.prepareInitSettings(this.clearDatastore);
this.isReady = true;
return this.runScript("init", args);
}).then(() => {
Expand Down
5 changes: 4 additions & 1 deletion src/Bottleneck.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Bottleneck
reservoir: null,
storeInstanceDefaults:
clientOptions: {},
clusterNodes: null,
clearDatastore: false,
Promise: Promise,
timeout: null,
Expand Down Expand Up @@ -59,7 +60,9 @@ class Bottleneck
else throw new Bottleneck::BottleneckError "Invalid datastore type: #{@datastore}"
ready: => @_store.ready
clients: => @_store.clients
disconnect: (flush=true) => await @_store.__disconnect__ flush
disconnect: (flush=true) =>
await @_store.__disconnect__ flush
@
chain: (@_limiter) => @
queued: (priority) => if priority? then @_queues[priority].length else @_queues.reduce ((a, b) -> a+b.length), 0
empty: -> @queued() == 0 and @_submitLock.isEmpty()
Expand Down
Loading

0 comments on commit e7d9fe4

Please sign in to comment.