diff --git a/README.md b/README.md index 2955d81..01c2034 100644 --- a/README.md +++ b/README.md @@ -399,12 +399,13 @@ const limiter = new Bottleneck({ /* Some basic options */ maxConcurrent: 5, minTime: 500 + id: "my-super-app" // Should be unique for every limiter in the same Redis db /* Clustering options */ datastore: "redis", clearDatastore: false, clientOptions: { - /* node-redis client options, passed to redis.createClient() */ + // node-redis client options, passed to redis.createClient() // See https://github.com/NodeRedis/node_redis#options-object-properties host: "127.0.0.1", port: 6379 @@ -475,6 +476,8 @@ You must work around these limitations in your application code if they are an i The current design guarantees reliability and lets clients (limiters) come and go. Your application can scale up or down, and clients can be disconnected without issues. +It is **strongly recommended** that you give an `id` for every limiter since it is used to build the name of your limiter's Redis keys! Limiters with the same `id` inside the same Redis db will be sharing the same datastore! + It is **strongly recommended** that you set an `expiration` (See [Job Options](#job-options)) *on every job*, since that lets the cluster recover from crashed or disconnected clients. Otherwise, a client crashing while executing a job would not be able to tell the cluster to decrease its number of "running" jobs. By using expirations, those lost jobs are automatically cleared after the specified time has passed. Using expirations is essential to keeping a cluster reliable in the face of unpredictable application bugs, network hiccups, and so on. Network latency between Node.js and Redis is not taken into account when calculating timings (such as `minTime`). To minimize the impact of latency, Bottleneck performs the absolute minimum number of state accesses. Keeping the Redis server close to your limiters will help you get a more consistent experience. Keeping the clients' server time consistent will also help. @@ -499,14 +502,20 @@ clusterLimiter.ready() .catch((error) => { /* ... */ }); ``` - ##### Additional Clustering information +- At the moment, each limiter opens 2 connections to Redis. This can lead to a high number of connections, especially when Groups are used. This might change in a future release. - Bottleneck is compatible with [Redis Clusters](https://redis.io/topics/cluster-tutorial). - Bottleneck's data is stored in Redis keys beginning with `b_` and it uses the `bottleneck` pub/sub channel. It will not interfere with any other data stored on the server. - Bottleneck loads a few Lua scripts on the Redis server using the `SCRIPT LOAD` command. These scripts only take up a few Kb of memory. Running the `SCRIPT FLUSH` command will cause any connected limiters to experience critical errors until a new limiter connects to Redis and loads the scripts again. - The Lua scripts are highly optimized and designed to use as few resources (CPU, especially) as possible. +##### Groups and Clustering + +- If you are using a Group, the generated limiters automatically receive an `id` with the pattern `group-key-${KEY}`. +- A Group collects its own garbage, and so when using Clustering, it manages the Redis TTL ([TTL](https://redis.io/commands/ttl)) on the keys it uses to ensure they get cleaned up by Redis when unused for longer than the Group's `timeout` setting. +- Each limiter opens 2 connections to Redis. Be careful not to go over [Redis' `maxclients` value](https://redis.io/topics/clients). + ## Debugging your application @@ -588,15 +597,6 @@ group.on('created', (limiter, key) => { Listening for the `created` event is the recommended way to set up a new limiter. Your event handler is executed before `key()` returns the newly created limiter. -__stopAutoCleanup()__ - -Calling `stopAutoCleanup()` on a group will turn off its garbage collection, so limiters for keys that have not been used in over **5 minutes** will no longer be deleted. It can be reenabled by calling `startAutoCleanup()`. The `5 minutes` figure can be modified by calling `updateSettings()`. - - -__startAutoCleanup()__ - -Reactivate the group's garbage collection. - __updateSettings()__ ```js diff --git a/bottleneck.d.ts b/bottleneck.d.ts index 7d10e65..658cbf6 100644 --- a/bottleneck.d.ts +++ b/bottleneck.d.ts @@ -108,16 +108,6 @@ declare module "bottleneck" { */ removeAllListeners(name?: string): void; - /** - * Disables limiter garbage collection. - */ - stopAutoCleanup(): void; - - /** - * Enables limiter garbage collection. - */ - startAutoCleanup(): void; - /** * Updates the group settings. * @param options - The new settings. diff --git a/bottleneck.d.ts.ejs b/bottleneck.d.ts.ejs index e46b4c2..b1d2220 100644 --- a/bottleneck.d.ts.ejs +++ b/bottleneck.d.ts.ejs @@ -108,16 +108,6 @@ declare module "bottleneck" { */ removeAllListeners(name?: string): void; - /** - * Disables limiter garbage collection. - */ - stopAutoCleanup(): void; - - /** - * Enables limiter garbage collection. - */ - startAutoCleanup(): void; - /** * Updates the group settings. * @param options - The new settings. diff --git a/bottleneck.js b/bottleneck.js index 661059d..565084c 100644 --- a/bottleneck.js +++ b/bottleneck.js @@ -464,7 +464,8 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a Bottleneck.prototype.storeInstanceDefaults = { clientOptions: {}, clearDatastore: false, - Promise: Promise + Promise: Promise, + _groupTimeout: null }; Bottleneck.prototype.instanceDefaults = { @@ -638,14 +639,12 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a // Generated by CoffeeScript 2.2.2 (function () { - var BottleneckError, Events, Group, parser; + var Events, Group, parser; parser = require("./parser"); Events = require("./Events"); - BottleneckError = require("./BottleneckError"); - Group = function () { class Group { constructor(limiterOptions = {}, groupOptions = {}) { @@ -653,25 +652,24 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a this.deleteKey = this.deleteKey.bind(this); this.limiters = this.limiters.bind(this); this.keys = this.keys.bind(this); - this.startAutoCleanup = this.startAutoCleanup.bind(this); - this.stopAutoCleanup = this.stopAutoCleanup.bind(this); + this._startAutoCleanup = this._startAutoCleanup.bind(this); this.updateSettings = this.updateSettings.bind(this); this.limiterOptions = limiterOptions; - if (this.limiterOptions.datastore === "redis") { - throw new BottleneckError("Groups do not currently support Clustering. This will be implemented in a future version. Please open an issue at https://github.com/SGrondin/bottleneck/issues if you would like this feature to be implemented."); - } parser.load(groupOptions, this.defaults, this); this.Events = new Events(this); this.instances = {}; this.Bottleneck = require("./Bottleneck"); - this.startAutoCleanup(); + this._startAutoCleanup(); } key(key = "") { var ref; return (ref = this.instances[key]) != null ? ref : (() => { var limiter; - limiter = this.instances[key] = new this.Bottleneck(this.limiterOptions); + limiter = this.instances[key] = new this.Bottleneck(Object.assign(this.limiterOptions, { + id: `group-key-${key}`, + _groupTimeout: this.timeout + })); this.Events.trigger("created", [limiter, key]); return limiter; })(); @@ -703,42 +701,37 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a return Object.keys(this.instances); } - startAutoCleanup() { + _startAutoCleanup() { var _this = this; var base; - this.stopAutoCleanup(); + clearInterval(this.interval); return typeof (base = this.interval = setInterval(_asyncToGenerator(function* () { - var check, e, k, ref, results, time, v; + var e, k, ref, results, time, v; time = Date.now(); ref = _this.instances; results = []; for (k in ref) { v = ref[k]; try { - check = yield v._store.__groupCheck__(); - if (check + _this.timeout < time) { + if (yield v._store.__groupCheck__(time)) { results.push(_this.deleteKey(k)); } else { results.push(void 0); } } catch (error) { e = error; - results.push(v._trigger("error", [e])); + results.push(v.Events.trigger("error", [e])); } } return results; }), this.timeout / 2)).unref === "function" ? base.unref() : void 0; } - stopAutoCleanup() { - return clearInterval(this.interval); - } - updateSettings(options = {}) { parser.overwrite(options, this.defaults, this); if (options.timeout != null) { - return this.startAutoCleanup(); + return this._startAutoCleanup(); } } @@ -753,7 +746,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a module.exports = Group; }).call(undefined); -},{"./Bottleneck":1,"./BottleneckError":2,"./Events":4,"./parser":11}],6:[function(require,module,exports){ +},{"./Bottleneck":1,"./Events":4,"./parser":11}],6:[function(require,module,exports){ "use strict"; function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; } @@ -813,12 +806,12 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a })(); } - __groupCheck__() { + __groupCheck__(time) { var _this3 = this; return _asyncToGenerator(function* () { yield _this3.yieldLoop(); - return _this3._nextRequest; + return _this3._nextRequest + _this3._groupTimeout < time; })(); } @@ -956,7 +949,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a // Generated by CoffeeScript 2.2.2 (function () { - var BottleneckError, DLList, RedisStorage, libraries, lua, parser, scripts; + var BottleneckError, DLList, RedisStorage, libraries, lua, parser, scriptTemplates; parser = require("./parser"); @@ -969,60 +962,64 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a libraries = { get_time: lua["get_time.lua"], refresh_running: lua["refresh_running.lua"], - conditions_check: lua["conditions_check.lua"] + conditions_check: lua["conditions_check.lua"], + refresh_expiration: lua["refresh_expiration.lua"], + validate_keys: lua["validate_keys.lua"] }; - scripts = { - init: { - keys: ["b_settings", "b_running", "b_executing"], - libs: [], - code: lua["init.lua"] - }, - update_settings: { - keys: ["b_settings"], - libs: [], - code: lua["update_settings.lua"] - }, - running: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running"], - code: lua["running.lua"] - }, - group_check: { - keys: ["b_settings"], - libs: [], - code: lua["group_check.lua"] - }, - check: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running", "conditions_check"], - code: lua["check.lua"] - }, - submit: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running", "conditions_check"], - code: lua["submit.lua"] - }, - register: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running", "conditions_check"], - code: lua["register.lua"] - }, - free: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running"], - code: lua["free.lua"] - }, - current_reservoir: { - keys: ["b_settings"], - libs: [], - code: lua["current_reservoir.lua"] - }, - increment_reservoir: { - keys: ["b_settings"], - libs: [], - code: lua["increment_reservoir.lua"] - } + scriptTemplates = function scriptTemplates(id) { + return { + init: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["refresh_expiration"], + code: lua["init.lua"] + }, + update_settings: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_expiration"], + code: lua["update_settings.lua"] + }, + running: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running"], + code: lua["running.lua"] + }, + group_check: { + keys: [`b_${id}_settings`], + libs: [], + code: lua["group_check.lua"] + }, + check: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running", "conditions_check"], + code: lua["check.lua"] + }, + submit: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running", "conditions_check", "refresh_expiration"], + code: lua["submit.lua"] + }, + register: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running", "conditions_check", "refresh_expiration"], + code: lua["register.lua"] + }, + free: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running"], + code: lua["free.lua"] + }, + current_reservoir: { + keys: [`b_${id}_settings`], + libs: ["validate_keys"], + code: lua["current_reservoir.lua"] + }, + increment_reservoir: { + keys: [`b_${id}_settings`], + libs: ["validate_keys"], + code: lua["increment_reservoir.lua"] + } + }; }; RedisStorage = class RedisStorage { @@ -1034,6 +1031,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a redis = r(function () { return ["r", "e", "d", "i", "s"].join(""); // Obfuscated or else Webpack/Angular will try to inline the optional redis module }()); + this.scripts = scriptTemplates(this.instance.id); parser.load(options, options, this); this.client = redis.createClient(this.clientOptions); this.subClient = redis.createClient(this.clientOptions); @@ -1092,6 +1090,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a initSettings.running = 0; initSettings.unblockTime = 0; initSettings.version = this.instance.version; + initSettings.groupTimeout = this._groupTimeout; args = this.prepareObject(initSettings); args.unshift(options.clearDatastore ? 1 : 0); this.isReady = true; @@ -1110,9 +1109,9 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a loadScript(name) { return new this.Promise((resolve, reject) => { var payload; - payload = scripts[name].libs.map(function (lib) { + payload = this.scripts[name].libs.map(function (lib) { return libraries[lib]; - }).join("\n") + scripts[name].code; + }).join("\n") + this.scripts[name].code; return this.client.multi([["script", "load", payload]]).exec((err, replies) => { if (err != null) { return reject(err); @@ -1126,10 +1125,11 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a loadAll() { var k, v; return this.Promise.all(function () { - var results1; + var ref, results1; + ref = this.scripts; results1 = []; - for (k in scripts) { - v = scripts[k]; + for (k in ref) { + v = ref[k]; results1.push(this.loadScript(k)); } return results1; @@ -1157,12 +1157,13 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a } runScript(name, args) { + var script; if (!this.isReady) { return this.Promise.reject(new BottleneckError("This limiter is not done connecting to Redis yet. Wait for the 'ready' event to be triggered before submitting requests.")); } else { + script = this.scripts[name]; return new this.Promise((resolve, reject) => { - var arr, script; - script = scripts[name]; + var arr; arr = [this.shas[name], script.keys.length].concat(script.keys, args, function (err, replies) { if (err != null) { return reject(err); @@ -1171,6 +1172,12 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a }); this.instance.Events.trigger("debug", [`Calling Redis script: ${name}.lua`, args]); return this.client.evalsha.bind(this.client).apply({}, arr); + }).catch(e => { + if (e.message === "SETTINGS_KEY_NOT_FOUND") { + return this.Promise.reject(new BottleneckError(`Bottleneck limiter (id: '${this.instance.id}') could not find the Redis key it needs to complete this action (key '${script.keys[0]}'), was it deleted?${this._groupTimeout != null ? ' Note: This limiter is in a Group, it could have been garbage collected.' : ''}`)); + } else { + return this.Promise.reject(e); + } }); } } @@ -1199,7 +1206,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a var _this3 = this; return _asyncToGenerator(function* () { - return parseInt((yield _this3.runScript("group_check", [])), 10); + return _this3.convertBool((yield _this3.runScript("group_check", []))); })(); } @@ -1392,14 +1399,16 @@ module.exports={ "current_reservoir.lua": "local settings_key = KEYS[1]\n\nreturn tonumber(redis.call('hget', settings_key, 'reservoir'))\n", "free.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal now = ARGV[2]\n\nredis.call('zadd', executing_key, 0, index)\n\nreturn refresh_running(executing_key, running_key, settings_key, now)\n", "get_time.lua": "redis.replicate_commands()\n\nlocal get_time = function ()\n local time = redis.call('time')\n\n return tonumber(time[1]..string.sub(time[2], 1, 3))\nend\n", - "group_check.lua": "local settings_key = KEYS[1]\n\nreturn redis.call('hget', settings_key, 'nextRequest')\n", + "group_check.lua": "local settings_key = KEYS[1]\n\nreturn not (redis.call('exists', settings_key) == 1)\n", "increment_reservoir.lua": "local settings_key = KEYS[1]\nlocal incr = ARGV[1]\n\nreturn redis.call('hincrby', settings_key, 'reservoir', incr)\n", - "init.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal clear = tonumber(ARGV[1])\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n local args = {'hmset', settings_key}\n\n for i = 2, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\nend\n\nreturn {}\n", + "init.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal clear = tonumber(ARGV[1])\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n local args = {'hmset', settings_key}\n\n for i = 2, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n", + "refresh_expiration.lua": "local refresh_expiration = function (executing_key, running_key, settings_key, now, nextRequest, groupTimeout)\n\n if groupTimeout ~= nil then\n local ttl = (nextRequest + groupTimeout) - now\n\n redis.call('pexpire', executing_key, ttl)\n redis.call('pexpire', running_key, ttl)\n redis.call('pexpire', settings_key, ttl)\n end\n\nend\n", "refresh_running.lua": "local refresh_running = function (executing_key, running_key, settings_key, now)\n\n local expired = redis.call('zrangebyscore', executing_key, '-inf', '('..now)\n\n if #expired == 0 then\n return redis.call('hget', settings_key, 'running')\n else\n redis.call('zremrangebyscore', executing_key, '-inf', '('..now)\n\n local args = {'hmget', running_key}\n for i = 1, #expired do\n table.insert(args, expired[i])\n end\n\n local weights = redis.call(unpack(args))\n\n args[1] = 'hdel'\n local deleted = redis.call(unpack(args))\n\n local total = 0\n for i = 1, #weights do\n total = total + (tonumber(weights[i]) or 0)\n end\n local incr = -total\n if total == 0 then\n incr = 0\n else\n redis.call('publish', 'bottleneck', 'freed:'..total)\n end\n\n return redis.call('hincrby', settings_key, 'running', incr)\n end\n\nend\n", - "register.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal weight = tonumber(ARGV[2])\nlocal expiration = tonumber(ARGV[3])\nlocal now = tonumber(ARGV[4])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest',\n 'minTime'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\nlocal minTime = tonumber(settings[4])\n\nif conditions_check(weight, maxConcurrent, running, reservoir) then\n\n if expiration ~= nil then\n redis.call('zadd', executing_key, now + expiration, index)\n end\n redis.call('hset', running_key, index, weight)\n redis.call('hincrby', settings_key, 'running', weight)\n\n local wait = math.max(nextRequest - now, 0)\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', now + wait + minTime\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', now + wait + minTime\n )\n end\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n", + "register.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal weight = tonumber(ARGV[2])\nlocal expiration = tonumber(ARGV[3])\nlocal now = tonumber(ARGV[4])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest',\n 'minTime',\n 'groupTimeout'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\nlocal minTime = tonumber(settings[4])\nlocal groupTimeout = tonumber(settings[5])\n\nif conditions_check(weight, maxConcurrent, running, reservoir) then\n\n if expiration ~= nil then\n redis.call('zadd', executing_key, now + expiration, index)\n end\n redis.call('hset', running_key, index, weight)\n redis.call('hincrby', settings_key, 'running', weight)\n\n local wait = math.max(nextRequest - now, 0)\n local newNextRequest = now + wait + minTime\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', newNextRequest\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', newNextRequest\n )\n end\n\n refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout)\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n", "running.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\nlocal now = ARGV[1]\n\nreturn tonumber(refresh_running(executing_key, running_key, settings_key, now))\n", - "submit.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal queueLength = tonumber(ARGV[1])\nlocal weight = tonumber(ARGV[2])\nlocal now = tonumber(ARGV[3])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'highWater',\n 'reservoir',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal highWater = tonumber(settings[2])\nlocal reservoir = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(weight, maxConcurrent, running, reservoir)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', unblockTime + minTime\n )\nend\n\nreturn {reachedHWM, blocked, strategy}\n", - "update_settings.lua": "local settings_key = KEYS[1]\n\nlocal args = {'hmset', settings_key}\n\nfor i = 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nreturn {}\n" + "submit.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal queueLength = tonumber(ARGV[1])\nlocal weight = tonumber(ARGV[2])\nlocal now = tonumber(ARGV[3])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'highWater',\n 'reservoir',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime',\n 'groupTimeout'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal highWater = tonumber(settings[2])\nlocal reservoir = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\nlocal groupTimeout = tonumber(settings[9])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(weight, maxConcurrent, running, reservoir)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n local newNextRequest = unblockTime + minTime\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', newNextRequest\n )\n\n refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout)\nend\n\nreturn {reachedHWM, blocked, strategy}\n", + "update_settings.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal args = {'hmset', settings_key}\n\nfor i = 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n", + "validate_keys.lua": "local settings_key = KEYS[1]\n\nif not (redis.call('exists', settings_key) == 1) then\n return redis.error_reply('SETTINGS_KEY_NOT_FOUND')\nend\n" } },{}],11:[function(require,module,exports){ diff --git a/bottleneck.min.js b/bottleneck.min.js index 014b286..f3fd22b 100644 --- a/bottleneck.min.js +++ b/bottleneck.min.js @@ -1 +1 @@ -(function(){function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error("Cannot find module '"+o+"'");throw f.code="MODULE_NOT_FOUND",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o=ref;i=1<=ref?++j:--j){results.push(new DLList)}return results}chain(_limiter){this._limiter=_limiter;return this}_sanitizePriority(priority){var sProperty;sProperty=~~priority!==priority?DEFAULT_PRIORITY:priority;if(sProperty<0){return 0}else if(sProperty>NUM_PRIORITIES-1){return NUM_PRIORITIES-1}else{return sProperty}}_find(arr,fn){var ref;return(ref=function(){var i,j,len,x;for(i=j=0,len=arr.length;j0})}_randomIndex(){return Math.random().toString(36).slice(2)}check(weight=1){var _this3=this;return _asyncToGenerator(function*(){return yield _this3._store.__check__(weight)})()}_run(next,wait,index){var _this4=this;var completed,done;this.Events.trigger("debug",[`Scheduling ${next.options.id}`,{args:next.args,options:next.options}]);done=false;completed=(()=>{var _ref=_asyncToGenerator(function*(...args){var e,ref,running;if(!done){try{done=true;clearTimeout(_this4._executing[index].expiration);delete _this4._executing[index];_this4.Events.trigger("debug",[`Completed ${next.options.id}`,{args:next.args,options:next.options}]);var _ref2=yield _this4._store.__free__(index,next.options.weight);running=_ref2.running;_this4.Events.trigger("debug",[`Freed ${next.options.id}`,{args:next.args,options:next.options}]);_this4._drainAll().catch(function(e){return _this4.Events.trigger("error",[e])});if(running===0&&_this4.empty()){_this4.Events.trigger("idle",[])}return(ref=next.cb)!=null?ref.apply({},args):void 0}catch(error){e=error;return _this4.Events.trigger("error",[e])}}});return function completed(){return _ref.apply(this,arguments)}})();return this._executing[index]={timeout:setTimeout(()=>{this.Events.trigger("debug",[`Executing ${next.options.id}`,{args:next.args,options:next.options}]);if(this._limiter!=null){return this._limiter.submit.apply(this._limiter,Array.prototype.concat(next.options,next.task,next.args,completed))}else{return next.task.apply({},next.args.concat(completed))}},wait),expiration:next.options.expiration!=null?setTimeout(()=>{return completed(new Bottleneck.prototype.BottleneckError(`This job timed out after ${next.options.expiration} ms.`))},next.options.expiration):void 0,job:next}}_drainOne(freed){return this._registerLock.schedule(()=>{var args,index,options,queue;if(this.queued()===0){return this.Promise.resolve(false)}queue=this._getFirst(this._queues);var _queue$first=queue.first();options=_queue$first.options;args=_queue$first.args;if(freed!=null&&options.weight>freed){return this.Promise.resolve(false)}this.Events.trigger("debug",[`Draining ${options.id}`,{args:args,options:options}]);index=this._randomIndex();return this._store.__register__(index,options.weight,options.expiration).then(({success:success,wait:wait,reservoir:reservoir})=>{var empty,next;this.Events.trigger("debug",[`Drained ${options.id}`,{success:success,args:args,options:options}]);if(success){next=queue.shift();empty=this.empty();if(empty){this.Events.trigger("empty",[])}if(reservoir===0){this.Events.trigger("depleted",[empty])}this._run(next,wait,index)}return this.Promise.resolve(success)})})}_drainAll(freed){return this._drainOne(freed).then(success=>{if(success){return this._drainAll()}else{return this.Promise.resolve(success)}}).catch(e=>{return this.Events.trigger("error",[e])})}submit(...args){var _this5=this;var cb,job,options,ref,ref1,task;if(typeof args[0]==="function"){var _ref3,_ref4,_splice$call,_splice$call2;ref=args,_ref3=ref,_ref4=_toArray(_ref3),task=_ref4[0],args=_ref4.slice(1),_ref3,_splice$call=splice.call(args,-1),_splice$call2=_slicedToArray(_splice$call,1),cb=_splice$call2[0],_splice$call;options=this.jobDefaults}else{var _ref5,_ref6,_splice$call3,_splice$call4;ref1=args,_ref5=ref1,_ref6=_toArray(_ref5),options=_ref6[0],task=_ref6[1],args=_ref6.slice(2),_ref5,_splice$call3=splice.call(args,-1),_splice$call4=_slicedToArray(_splice$call3,1),cb=_splice$call4[0],_splice$call3;options=parser.load(options,this.jobDefaults)}job={options:options,task:task,args:args,cb:cb};options.priority=this._sanitizePriority(options.priority);this.Events.trigger("debug",[`Queueing ${options.id}`,{args:args,options:options}]);return this._submitLock.schedule(_asyncToGenerator(function*(){var blocked,e,reachedHWM,shifted,strategy;try{var _ref8=yield _this5._store.__submit__(_this5.queued(),options.weight);reachedHWM=_ref8.reachedHWM;blocked=_ref8.blocked;strategy=_ref8.strategy;_this5.Events.trigger("debug",[`Queued ${options.id}`,{args:args,options:options,reachedHWM:reachedHWM,blocked:blocked}])}catch(error){e=error;_this5.Events.trigger("debug",[`Could not queue ${options.id}`,{args:args,options:options,error:e}]);job.cb(e);return false}if(blocked){_this5._queues=_this5._makeQueues();_this5.Events.trigger("dropped",[job]);return true}else if(reachedHWM){shifted=strategy===Bottleneck.prototype.strategy.LEAK?_this5._getFirst(_this5._queues.slice(options.priority).reverse()).shift():strategy===Bottleneck.prototype.strategy.OVERFLOW_PRIORITY?_this5._getFirst(_this5._queues.slice(options.priority+1).reverse()).shift():strategy===Bottleneck.prototype.strategy.OVERFLOW?job:void 0;if(shifted!=null){_this5.Events.trigger("dropped",[shifted])}if(shifted==null||strategy===Bottleneck.prototype.strategy.OVERFLOW){return reachedHWM}}_this5._queues[options.priority].push(job);yield _this5._drainAll();return reachedHWM}))}schedule(...args){var options,task,wrapped;if(typeof args[0]==="function"){var _args=args;var _args2=_toArray(_args);task=_args2[0];args=_args2.slice(1);options=this.jobDefaults}else{var _args3=args;var _args4=_toArray(_args3);options=_args4[0];task=_args4[1];args=_args4.slice(2);options=parser.load(options,this.jobDefaults)}wrapped=function wrapped(...args){var _ref9,_ref10,_splice$call5,_splice$call6;var cb,ref;ref=args,_ref9=ref,_ref10=_toArray(_ref9),args=_ref10.slice(0),_ref9,_splice$call5=splice.call(args,-1),_splice$call6=_slicedToArray(_splice$call5,1),cb=_splice$call6[0],_splice$call5;return task.apply({},args).then(function(...args){return cb.apply({},Array.prototype.concat(null,args))}).catch(function(...args){return cb.apply({},args)})};return new this.Promise((resolve,reject)=>{return this.submit.apply({},Array.prototype.concat(options,wrapped,args,function(...args){return(args[0]!=null?reject:(args.shift(),resolve)).apply({},args)})).catch(e=>{return this.Events.trigger("error",[e])})})}wrap(fn){return(...args)=>{return this.schedule.apply({},Array.prototype.concat(fn,args))}}updateSettings(options={}){var _this6=this;return _asyncToGenerator(function*(){yield _this6._store.__updateSettings__(parser.overwrite(options,_this6.storeDefaults));parser.overwrite(options,_this6.instanceDefaults,_this6);_this6._drainAll().catch(function(e){return _this6.Events.trigger("error",[e])});return _this6})()}currentReservoir(){var _this7=this;return _asyncToGenerator(function*(){return yield _this7._store.__currentReservoir__()})()}incrementReservoir(incr=0){var _this8=this;return _asyncToGenerator(function*(){yield _this8._store.__incrementReservoir__(incr);_this8._drainAll().catch(function(e){return _this8.Events.trigger("error",[e])});return _this8})()}}Bottleneck.default=Bottleneck;Bottleneck.version=Bottleneck.prototype.version=packagejson.version;Bottleneck.strategy=Bottleneck.prototype.strategy={LEAK:1,OVERFLOW:2,OVERFLOW_PRIORITY:4,BLOCK:3};Bottleneck.BottleneckError=Bottleneck.prototype.BottleneckError=require("./BottleneckError");Bottleneck.Group=Bottleneck.prototype.Group=require("./Group");Bottleneck.prototype.jobDefaults={priority:DEFAULT_PRIORITY,weight:1,expiration:null,id:""};Bottleneck.prototype.storeDefaults={maxConcurrent:null,minTime:0,highWater:null,strategy:Bottleneck.prototype.strategy.LEAK,penalty:null,reservoir:null};Bottleneck.prototype.storeInstanceDefaults={clientOptions:{},clearDatastore:false,Promise:Promise};Bottleneck.prototype.instanceDefaults={datastore:"local",id:"",rejectOnDrop:true,Promise:Promise};return Bottleneck}.call(this);module.exports=Bottleneck}).call(undefined)},{"../package.json":12,"./BottleneckError":2,"./DLList":3,"./Events":4,"./Group":5,"./Local":6,"./RedisStorage":7,"./Sync":8,"./parser":11}],2:[function(require,module,exports){"use strict";(function(){var BottleneckError;BottleneckError=class BottleneckError extends Error{};module.exports=BottleneckError}).call(undefined)},{}],3:[function(require,module,exports){"use strict";(function(){var DLList;DLList=class DLList{constructor(){this._first=null;this._last=null;this.length=0}push(value){var node;this.length++;node={value:value,next:null};if(this._last!=null){this._last.next=node;this._last=node}else{this._first=this._last=node}return void 0}shift(){var ref1,value;if(this._first==null){return void 0}else{this.length--}value=this._first.value;this._first=(ref1=this._first.next)!=null?ref1:this._last=null;return value}first(){if(this._first!=null){return this._first.value}}getArray(){var node,ref,results;node=this._first;results=[];while(node!=null){results.push((ref=node,node=node.next,ref.value))}return results}};module.exports=DLList}).call(undefined)},{}],4:[function(require,module,exports){"use strict";(function(){var BottleneckError,Events;BottleneckError=require("./BottleneckError");Events=class Events{constructor(instance){this.instance=instance;this._events={};this.instance.on=((name,cb)=>{return this._addListener(name,"many",cb)});this.instance.once=((name,cb)=>{return this._addListener(name,"once",cb)});this.instance.removeAllListeners=((name=null)=>{if(name!=null){return delete this._events[name]}else{return this._events={}}})}_addListener(name,status,cb){var base;if((base=this._events)[name]==null){base[name]=[]}return this._events[name].push({cb:cb,status:status})}trigger(name,args){if(name!=="debug"){this.trigger("debug",[`Event triggered: ${name}`,args])}if(name==="dropped"&&this.instance.rejectOnDrop){args.forEach(function(job){return job.cb.apply({},[new BottleneckError("This job has been dropped by Bottleneck")])})}if(this._events[name]==null){return}this._events[name]=this._events[name].filter(function(listener){return listener.status!=="none"});return this._events[name].forEach(listener=>{var e,ret;if(listener.status==="none"){return}if(listener.status==="once"){listener.status="none"}try{ret=listener.cb.apply({},args);if(typeof(ret!=null?ret.then:void 0)==="function"){return ret.then(function(){}).catch(e=>{return this.trigger("error",[e])})}}catch(error){e=error;if("name"!=="error"){return this.trigger("error",[e])}}})}};module.exports=Events}).call(undefined)},{"./BottleneckError":2}],5:[function(require,module,exports){"use strict";function _asyncToGenerator(fn){return function(){var gen=fn.apply(this,arguments);return new Promise(function(resolve,reject){function step(key,arg){try{var info=gen[key](arg);var value=info.value}catch(error){reject(error);return}if(info.done){resolve(value)}else{return Promise.resolve(value).then(function(value){step("next",value)},function(err){step("throw",err)})}}return step("next")})}}(function(){var BottleneckError,Events,Group,parser;parser=require("./parser");Events=require("./Events");BottleneckError=require("./BottleneckError");Group=function(){class Group{constructor(limiterOptions={},groupOptions={}){this.key=this.key.bind(this);this.deleteKey=this.deleteKey.bind(this);this.limiters=this.limiters.bind(this);this.keys=this.keys.bind(this);this.startAutoCleanup=this.startAutoCleanup.bind(this);this.stopAutoCleanup=this.stopAutoCleanup.bind(this);this.updateSettings=this.updateSettings.bind(this);this.limiterOptions=limiterOptions;if(this.limiterOptions.datastore==="redis"){throw new BottleneckError("Groups do not currently support Clustering. This will be implemented in a future version. Please open an issue at https://github.com/SGrondin/bottleneck/issues if you would like this feature to be implemented.")}parser.load(groupOptions,this.defaults,this);this.Events=new Events(this);this.instances={};this.Bottleneck=require("./Bottleneck");this.startAutoCleanup()}key(key=""){var ref;return(ref=this.instances[key])!=null?ref:(()=>{var limiter;limiter=this.instances[key]=new this.Bottleneck(this.limiterOptions);this.Events.trigger("created",[limiter,key]);return limiter})()}deleteKey(key=""){var ref;if((ref=this.instances[key])!=null){ref.disconnect()}return delete this.instances[key]}limiters(){var k,ref,results,v;ref=this.instances;results=[];for(k in ref){v=ref[k];results.push({key:k,limiter:v})}return results}keys(){return Object.keys(this.instances)}startAutoCleanup(){var _this=this;var base;this.stopAutoCleanup();return typeof(base=this.interval=setInterval(_asyncToGenerator(function*(){var check,e,k,ref,results,time,v;time=Date.now();ref=_this.instances;results=[];for(k in ref){v=ref[k];try{check=yield v._store.__groupCheck__();if(check+_this.timeout=0)}__incrementReservoir__(incr){var _this4=this;return _asyncToGenerator(function*(){yield _this4.yieldLoop();return _this4.reservoir+=incr})()}__currentReservoir__(){var _this5=this;return _asyncToGenerator(function*(){yield _this5.yieldLoop();return _this5.reservoir})()}isBlocked(now){return this._unblockTime>=now}check(weight,now){return this.conditionsCheck(weight)&&this._nextRequest-now<=0}__check__(weight){var _this6=this;return _asyncToGenerator(function*(){var now;yield _this6.yieldLoop();now=Date.now();return _this6.check(weight,now)})()}__register__(index,weight,expiration){var _this7=this;return _asyncToGenerator(function*(){var now,wait;yield _this7.yieldLoop();now=Date.now();if(_this7.conditionsCheck(weight)){_this7._running+=weight;_this7._executing[index]={timeout:expiration!=null?setTimeout(function(){if(!_this7._executing[index].freed){_this7._executing[index].freed=true;return _this7._running-=weight}},expiration):void 0,freed:false};if(_this7.reservoir!=null){_this7.reservoir-=weight}wait=Math.max(_this7._nextRequest-now,0);_this7._nextRequest=now+wait+_this7.minTime;return{success:true,wait:wait,reservoir:_this7.reservoir}}else{return{success:false}}})()}strategyIsBlock(){return this.strategy===3}__submit__(queueLength,weight){var _this8=this;return _asyncToGenerator(function*(){var blocked,now,reachedHWM;yield _this8.yieldLoop();if(_this8.maxConcurrent!=null&&weight>_this8.maxConcurrent){throw new BottleneckError(`Impossible to add a job having a weight of ${weight} to a limiter having a maxConcurrent setting of ${_this8.maxConcurrent}`)}now=Date.now();reachedHWM=_this8.highWater!=null&&queueLength===_this8.highWater&&!_this8.check(weight,now);blocked=_this8.strategyIsBlock()&&(reachedHWM||_this8.isBlocked(now));if(blocked){_this8._unblockTime=now+_this8.computePenalty();_this8._nextRequest=_this8._unblockTime+_this8.minTime}return{reachedHWM:reachedHWM,blocked:blocked,strategy:_this8.strategy}})()}__free__(index,weight){var _this9=this;return _asyncToGenerator(function*(){yield _this9.yieldLoop();clearTimeout(_this9._executing[index].timeout);if(!_this9._executing[index].freed){_this9._executing[index].freed=true;_this9._running-=weight}return{running:_this9._running}})()}};module.exports=Local}).call(undefined)},{"./BottleneckError":2,"./DLList":3,"./parser":11}],7:[function(require,module,exports){"use strict";var _slicedToArray=function(){function sliceIterator(arr,i){var _arr=[];var _n=true;var _d=false;var _e=undefined;try{for(var _i=arr[Symbol.iterator](),_s;!(_n=(_s=_i.next()).done);_n=true){_arr.push(_s.value);if(i&&_arr.length===i)break}}catch(err){_d=true;_e=err}finally{try{if(!_n&&_i["return"])_i["return"]()}finally{if(_d)throw _e}}return _arr}return function(arr,i){if(Array.isArray(arr)){return arr}else if(Symbol.iterator in Object(arr)){return sliceIterator(arr,i)}else{throw new TypeError("Invalid attempt to destructure non-iterable instance")}}}();function _asyncToGenerator(fn){return function(){var gen=fn.apply(this,arguments);return new Promise(function(resolve,reject){function step(key,arg){try{var info=gen[key](arg);var value=info.value}catch(error){reject(error);return}if(info.done){resolve(value)}else{return Promise.resolve(value).then(function(value){step("next",value)},function(err){step("throw",err)})}}return step("next")})}}(function(){var BottleneckError,DLList,RedisStorage,libraries,lua,parser,scripts;parser=require("./parser");DLList=require("./DLList");BottleneckError=require("./BottleneckError");lua=require("./lua.json");libraries={get_time:lua["get_time.lua"],refresh_running:lua["refresh_running.lua"],conditions_check:lua["conditions_check.lua"]};scripts={init:{keys:["b_settings","b_running","b_executing"],libs:[],code:lua["init.lua"]},update_settings:{keys:["b_settings"],libs:[],code:lua["update_settings.lua"]},running:{keys:["b_settings","b_running","b_executing"],libs:["refresh_running"],code:lua["running.lua"]},group_check:{keys:["b_settings"],libs:[],code:lua["group_check.lua"]},check:{keys:["b_settings","b_running","b_executing"],libs:["refresh_running","conditions_check"],code:lua["check.lua"]},submit:{keys:["b_settings","b_running","b_executing"],libs:["refresh_running","conditions_check"],code:lua["submit.lua"]},register:{keys:["b_settings","b_running","b_executing"],libs:["refresh_running","conditions_check"],code:lua["register.lua"]},free:{keys:["b_settings","b_running","b_executing"],libs:["refresh_running"],code:lua["free.lua"]},current_reservoir:{keys:["b_settings"],libs:[],code:lua["current_reservoir.lua"]},increment_reservoir:{keys:["b_settings"],libs:[],code:lua["increment_reservoir.lua"]}};RedisStorage=class RedisStorage{constructor(instance,initSettings,options){var r,redis;this.loadAll=this.loadAll.bind(this);this.instance=instance;r=require;redis=r(function(){return["r","e","d","i","s"].join("")}());parser.load(options,options,this);this.client=redis.createClient(this.clientOptions);this.subClient=redis.createClient(this.clientOptions);this.shas={};this.clients={client:this.client,subscriber:this.subClient};this.isReady=false;this.ready=new this.Promise((resolve,reject)=>{var count,done,errorListener;errorListener=function errorListener(e){return reject(e)};count=0;done=(()=>{count++;if(count===2){[this.client,this.subClient].forEach(client=>{client.removeListener("error",errorListener);return client.on("error",e=>{return this.instance.Events.trigger("error",[e])})});return resolve()}});this.client.on("error",errorListener);this.client.on("ready",function(){return done()});this.subClient.on("error",errorListener);return this.subClient.on("ready",()=>{this.subClient.on("subscribe",function(){return done()});return this.subClient.subscribe("bottleneck")})}).then(this.loadAll).then(()=>{var args;this.subClient.on("message",(channel,message)=>{var info,type;var _message$split=message.split(":");var _message$split2=_slicedToArray(_message$split,2);type=_message$split2[0];info=_message$split2[1];if(type==="freed"){return this.instance._drainAll(~~info)}});initSettings.nextRequest=Date.now();initSettings.running=0;initSettings.unblockTime=0;initSettings.version=this.instance.version;args=this.prepareObject(initSettings);args.unshift(options.clearDatastore?1:0);this.isReady=true;return this.runScript("init",args)}).then(results=>{return this.clients})}disconnect(flush){this.client.end(flush);this.subClient.end(flush);return this}loadScript(name){return new this.Promise((resolve,reject)=>{var payload;payload=scripts[name].libs.map(function(lib){return libraries[lib]}).join("\n")+scripts[name].code;return this.client.multi([["script","load",payload]]).exec((err,replies)=>{if(err!=null){return reject(err)}this.shas[name]=replies[0];return resolve(replies[0])})})}loadAll(){var k,v;return this.Promise.all(function(){var results1;results1=[];for(k in scripts){v=scripts[k];results1.push(this.loadScript(k))}return results1}.call(this))}prepareArray(arr){return arr.map(function(x){if(x!=null){return x.toString()}else{return""}})}prepareObject(obj){var arr,k,v;arr=[];for(k in obj){v=obj[k];arr.push(k,v!=null?v.toString():"")}return arr}runScript(name,args){if(!this.isReady){return this.Promise.reject(new BottleneckError("This limiter is not done connecting to Redis yet. Wait for the 'ready' event to be triggered before submitting requests."))}else{return new this.Promise((resolve,reject)=>{var arr,script;script=scripts[name];arr=[this.shas[name],script.keys.length].concat(script.keys,args,function(err,replies){if(err!=null){return reject(err)}return resolve(replies)});this.instance.Events.trigger("debug",[`Calling Redis script: ${name}.lua`,args]);return this.client.evalsha.bind(this.client).apply({},arr)})}}convertBool(b){return!!b}__updateSettings__(options){var _this=this;return _asyncToGenerator(function*(){return yield _this.runScript("update_settings",_this.prepareObject(options))})()}__running__(){var _this2=this;return _asyncToGenerator(function*(){return yield _this2.runScript("running",[Date.now()])})()}__groupCheck__(){var _this3=this;return _asyncToGenerator(function*(){return parseInt(yield _this3.runScript("group_check",[]),10)})()}__incrementReservoir__(incr){var _this4=this;return _asyncToGenerator(function*(){return yield _this4.runScript("increment_reservoir",[incr])})()}__currentReservoir__(){var _this5=this;return _asyncToGenerator(function*(){return yield _this5.runScript("current_reservoir",[])})()}__check__(weight){var _this6=this;return _asyncToGenerator(function*(){return _this6.convertBool(yield _this6.runScript("check",_this6.prepareArray([weight,Date.now()])))})()}__register__(index,weight,expiration){var _this7=this;return _asyncToGenerator(function*(){var reservoir,success,wait;var _ref=yield _this7.runScript("register",_this7.prepareArray([index,weight,expiration,Date.now()]));var _ref2=_slicedToArray(_ref,3);success=_ref2[0];wait=_ref2[1];reservoir=_ref2[2];return{success:_this7.convertBool(success),wait:wait,reservoir:reservoir}})()}__submit__(queueLength,weight){var _this8=this;return _asyncToGenerator(function*(){var blocked,e,maxConcurrent,overweight,reachedHWM,strategy;try{var _ref3=yield _this8.runScript("submit",_this8.prepareArray([queueLength,weight,Date.now()]));var _ref4=_slicedToArray(_ref3,3);reachedHWM=_ref4[0];blocked=_ref4[1];strategy=_ref4[2];return{reachedHWM:_this8.convertBool(reachedHWM),blocked:_this8.convertBool(blocked),strategy:strategy}}catch(error){e=error;if(e.message.indexOf("OVERWEIGHT")===0){var _e$message$split=e.message.split(":");var _e$message$split2=_slicedToArray(_e$message$split,3);overweight=_e$message$split2[0];weight=_e$message$split2[1];maxConcurrent=_e$message$split2[2];throw new BottleneckError(`Impossible to add a job having a weight of ${weight} to a limiter having a maxConcurrent setting of ${maxConcurrent}`)}else{throw e}}})()}__free__(index,weight){var _this9=this;return _asyncToGenerator(function*(){var result;result=yield _this9.runScript("free",_this9.prepareArray([index,Date.now()]));return{running:result}})()}};module.exports=RedisStorage}).call(undefined)},{"./BottleneckError":2,"./DLList":3,"./lua.json":10,"./parser":11}],8:[function(require,module,exports){"use strict";var _slicedToArray=function(){function sliceIterator(arr,i){var _arr=[];var _n=true;var _d=false;var _e=undefined;try{for(var _i=arr[Symbol.iterator](),_s;!(_n=(_s=_i.next()).done);_n=true){_arr.push(_s.value);if(i&&_arr.length===i)break}}catch(err){_d=true;_e=err}finally{try{if(!_n&&_i["return"])_i["return"]()}finally{if(_d)throw _e}}return _arr}return function(arr,i){if(Array.isArray(arr)){return arr}else if(Symbol.iterator in Object(arr)){return sliceIterator(arr,i)}else{throw new TypeError("Invalid attempt to destructure non-iterable instance")}}}();function _toArray(arr){return Array.isArray(arr)?arr:Array.from(arr)}(function(){var DLList,Sync,splice=[].splice;DLList=require("./DLList");Sync=class Sync{constructor(name){this.submit=this.submit.bind(this);this.schedule=this.schedule.bind(this);this.name=name;this._running=0;this._queue=new DLList}isEmpty(){return this._queue.length===0}_tryToRun(){var next;if(this._running<1&&this._queue.length>0){this._running++;next=this._queue.shift();return next.task.apply({},next.args.concat((...args)=>{var ref;this._running--;this._tryToRun();return(ref=next.cb)!=null?ref.apply({},args):void 0}))}}submit(task,...args){var _ref,_ref2,_splice$call,_splice$call2;var cb,ref;ref=args,_ref=ref,_ref2=_toArray(_ref),args=_ref2.slice(0),_ref,_splice$call=splice.call(args,-1),_splice$call2=_slicedToArray(_splice$call,1),cb=_splice$call2[0],_splice$call;this._queue.push({task:task,args:args,cb:cb});return this._tryToRun()}schedule(task,...args){var wrapped;wrapped=function wrapped(...args){var _ref3,_ref4,_splice$call3,_splice$call4;var cb,ref;ref=args,_ref3=ref,_ref4=_toArray(_ref3),args=_ref4.slice(0),_ref3,_splice$call3=splice.call(args,-1),_splice$call4=_slicedToArray(_splice$call3,1),cb=_splice$call4[0],_splice$call3;return task.apply({},args).then(function(...args){return cb.apply({},Array.prototype.concat(null,args))}).catch(function(...args){return cb.apply({},args)})};return new Promise((resolve,reject)=>{return this.submit.apply({},Array.prototype.concat(wrapped,args,function(...args){return(args[0]!=null?reject:(args.shift(),resolve)).apply({},args)}))})}};module.exports=Sync}).call(undefined)},{"./DLList":3}],9:[function(require,module,exports){"use strict";(function(){module.exports=require("./Bottleneck")}).call(undefined)},{"./Bottleneck":1}],10:[function(require,module,exports){module.exports={"check.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal weight = tonumber(ARGV[1])\nlocal now = tonumber(ARGV[2])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\n\nlocal conditionsCheck = conditions_check(weight, maxConcurrent, running, reservoir)\n\nlocal result = conditionsCheck and nextRequest - now <= 0\n\nreturn result\n","conditions_check.lua":"local conditions_check = function (weight, maxConcurrent, running, reservoir)\n return (\n (maxConcurrent == nil or running + weight <= maxConcurrent) and\n (reservoir == nil or reservoir - weight >= 0)\n )\nend\n","current_reservoir.lua":"local settings_key = KEYS[1]\n\nreturn tonumber(redis.call('hget', settings_key, 'reservoir'))\n","free.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal now = ARGV[2]\n\nredis.call('zadd', executing_key, 0, index)\n\nreturn refresh_running(executing_key, running_key, settings_key, now)\n","get_time.lua":"redis.replicate_commands()\n\nlocal get_time = function ()\n local time = redis.call('time')\n\n return tonumber(time[1]..string.sub(time[2], 1, 3))\nend\n","group_check.lua":"local settings_key = KEYS[1]\n\nreturn redis.call('hget', settings_key, 'nextRequest')\n","increment_reservoir.lua":"local settings_key = KEYS[1]\nlocal incr = ARGV[1]\n\nreturn redis.call('hincrby', settings_key, 'reservoir', incr)\n","init.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal clear = tonumber(ARGV[1])\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n local args = {'hmset', settings_key}\n\n for i = 2, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\nend\n\nreturn {}\n","refresh_running.lua":"local refresh_running = function (executing_key, running_key, settings_key, now)\n\n local expired = redis.call('zrangebyscore', executing_key, '-inf', '('..now)\n\n if #expired == 0 then\n return redis.call('hget', settings_key, 'running')\n else\n redis.call('zremrangebyscore', executing_key, '-inf', '('..now)\n\n local args = {'hmget', running_key}\n for i = 1, #expired do\n table.insert(args, expired[i])\n end\n\n local weights = redis.call(unpack(args))\n\n args[1] = 'hdel'\n local deleted = redis.call(unpack(args))\n\n local total = 0\n for i = 1, #weights do\n total = total + (tonumber(weights[i]) or 0)\n end\n local incr = -total\n if total == 0 then\n incr = 0\n else\n redis.call('publish', 'bottleneck', 'freed:'..total)\n end\n\n return redis.call('hincrby', settings_key, 'running', incr)\n end\n\nend\n","register.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal weight = tonumber(ARGV[2])\nlocal expiration = tonumber(ARGV[3])\nlocal now = tonumber(ARGV[4])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest',\n 'minTime'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\nlocal minTime = tonumber(settings[4])\n\nif conditions_check(weight, maxConcurrent, running, reservoir) then\n\n if expiration ~= nil then\n redis.call('zadd', executing_key, now + expiration, index)\n end\n redis.call('hset', running_key, index, weight)\n redis.call('hincrby', settings_key, 'running', weight)\n\n local wait = math.max(nextRequest - now, 0)\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', now + wait + minTime\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', now + wait + minTime\n )\n end\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n","running.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\nlocal now = ARGV[1]\n\nreturn tonumber(refresh_running(executing_key, running_key, settings_key, now))\n","submit.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal queueLength = tonumber(ARGV[1])\nlocal weight = tonumber(ARGV[2])\nlocal now = tonumber(ARGV[3])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'highWater',\n 'reservoir',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal highWater = tonumber(settings[2])\nlocal reservoir = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(weight, maxConcurrent, running, reservoir)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', unblockTime + minTime\n )\nend\n\nreturn {reachedHWM, blocked, strategy}\n","update_settings.lua":"local settings_key = KEYS[1]\n\nlocal args = {'hmset', settings_key}\n\nfor i = 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nreturn {}\n"}},{}],11:[function(require,module,exports){"use strict";(function(){exports.load=function(received,defaults,onto={}){var k,ref,v;for(k in defaults){v=defaults[k];onto[k]=(ref=received[k])!=null?ref:v}return onto};exports.overwrite=function(received,defaults,onto={}){var k,v;for(k in received){v=received[k];if(defaults[k]!==void 0){onto[k]=v}}return onto}}).call(undefined)},{}],12:[function(require,module,exports){module.exports={name:"bottleneck",version:"2.1.0",description:"Distributed task scheduler and rate limiter",main:"lib/index.js",typings:"bottleneck.d.ts",scripts:{test:"./node_modules/mocha/bin/mocha test",build:"./scripts/build.sh",compile:"./scripts/build.sh compile"},repository:{type:"git",url:"https://github.com/SGrondin/bottleneck"},keywords:["async rate limiter","rate limiter","rate limiting","async","rate","limiting","limiter","throttle","throttling","load","ddos"],author:{name:"Simon Grondin"},license:"MIT",bugs:{url:"https://github.com/SGrondin/bottleneck/issues"},devDependencies:{"@types/es6-promise":"0.0.33",assert:"1.4.x","babel-core":"^6.26.0","babel-preset-env":"^1.6.1",browserify:"*",coffeescript:"2.2.x","ejs-cli":"2.0.1",mocha:"4.x",redis:"^2.8.0",typescript:"^2.6.2","uglify-es":"3.x"}}},{}]},{},[9]); \ No newline at end of file +(function(){function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error("Cannot find module '"+o+"'");throw f.code="MODULE_NOT_FOUND",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o=ref;i=1<=ref?++j:--j){results.push(new DLList)}return results}chain(_limiter){this._limiter=_limiter;return this}_sanitizePriority(priority){var sProperty;sProperty=~~priority!==priority?DEFAULT_PRIORITY:priority;if(sProperty<0){return 0}else if(sProperty>NUM_PRIORITIES-1){return NUM_PRIORITIES-1}else{return sProperty}}_find(arr,fn){var ref;return(ref=function(){var i,j,len,x;for(i=j=0,len=arr.length;j0})}_randomIndex(){return Math.random().toString(36).slice(2)}check(weight=1){var _this3=this;return _asyncToGenerator(function*(){return yield _this3._store.__check__(weight)})()}_run(next,wait,index){var _this4=this;var completed,done;this.Events.trigger("debug",[`Scheduling ${next.options.id}`,{args:next.args,options:next.options}]);done=false;completed=(()=>{var _ref=_asyncToGenerator(function*(...args){var e,ref,running;if(!done){try{done=true;clearTimeout(_this4._executing[index].expiration);delete _this4._executing[index];_this4.Events.trigger("debug",[`Completed ${next.options.id}`,{args:next.args,options:next.options}]);var _ref2=yield _this4._store.__free__(index,next.options.weight);running=_ref2.running;_this4.Events.trigger("debug",[`Freed ${next.options.id}`,{args:next.args,options:next.options}]);_this4._drainAll().catch(function(e){return _this4.Events.trigger("error",[e])});if(running===0&&_this4.empty()){_this4.Events.trigger("idle",[])}return(ref=next.cb)!=null?ref.apply({},args):void 0}catch(error){e=error;return _this4.Events.trigger("error",[e])}}});return function completed(){return _ref.apply(this,arguments)}})();return this._executing[index]={timeout:setTimeout(()=>{this.Events.trigger("debug",[`Executing ${next.options.id}`,{args:next.args,options:next.options}]);if(this._limiter!=null){return this._limiter.submit.apply(this._limiter,Array.prototype.concat(next.options,next.task,next.args,completed))}else{return next.task.apply({},next.args.concat(completed))}},wait),expiration:next.options.expiration!=null?setTimeout(()=>{return completed(new Bottleneck.prototype.BottleneckError(`This job timed out after ${next.options.expiration} ms.`))},next.options.expiration):void 0,job:next}}_drainOne(freed){return this._registerLock.schedule(()=>{var args,index,options,queue;if(this.queued()===0){return this.Promise.resolve(false)}queue=this._getFirst(this._queues);var _queue$first=queue.first();options=_queue$first.options;args=_queue$first.args;if(freed!=null&&options.weight>freed){return this.Promise.resolve(false)}this.Events.trigger("debug",[`Draining ${options.id}`,{args:args,options:options}]);index=this._randomIndex();return this._store.__register__(index,options.weight,options.expiration).then(({success:success,wait:wait,reservoir:reservoir})=>{var empty,next;this.Events.trigger("debug",[`Drained ${options.id}`,{success:success,args:args,options:options}]);if(success){next=queue.shift();empty=this.empty();if(empty){this.Events.trigger("empty",[])}if(reservoir===0){this.Events.trigger("depleted",[empty])}this._run(next,wait,index)}return this.Promise.resolve(success)})})}_drainAll(freed){return this._drainOne(freed).then(success=>{if(success){return this._drainAll()}else{return this.Promise.resolve(success)}}).catch(e=>{return this.Events.trigger("error",[e])})}submit(...args){var _this5=this;var cb,job,options,ref,ref1,task;if(typeof args[0]==="function"){var _ref3,_ref4,_splice$call,_splice$call2;ref=args,_ref3=ref,_ref4=_toArray(_ref3),task=_ref4[0],args=_ref4.slice(1),_ref3,_splice$call=splice.call(args,-1),_splice$call2=_slicedToArray(_splice$call,1),cb=_splice$call2[0],_splice$call;options=this.jobDefaults}else{var _ref5,_ref6,_splice$call3,_splice$call4;ref1=args,_ref5=ref1,_ref6=_toArray(_ref5),options=_ref6[0],task=_ref6[1],args=_ref6.slice(2),_ref5,_splice$call3=splice.call(args,-1),_splice$call4=_slicedToArray(_splice$call3,1),cb=_splice$call4[0],_splice$call3;options=parser.load(options,this.jobDefaults)}job={options:options,task:task,args:args,cb:cb};options.priority=this._sanitizePriority(options.priority);this.Events.trigger("debug",[`Queueing ${options.id}`,{args:args,options:options}]);return this._submitLock.schedule(_asyncToGenerator(function*(){var blocked,e,reachedHWM,shifted,strategy;try{var _ref8=yield _this5._store.__submit__(_this5.queued(),options.weight);reachedHWM=_ref8.reachedHWM;blocked=_ref8.blocked;strategy=_ref8.strategy;_this5.Events.trigger("debug",[`Queued ${options.id}`,{args:args,options:options,reachedHWM:reachedHWM,blocked:blocked}])}catch(error){e=error;_this5.Events.trigger("debug",[`Could not queue ${options.id}`,{args:args,options:options,error:e}]);job.cb(e);return false}if(blocked){_this5._queues=_this5._makeQueues();_this5.Events.trigger("dropped",[job]);return true}else if(reachedHWM){shifted=strategy===Bottleneck.prototype.strategy.LEAK?_this5._getFirst(_this5._queues.slice(options.priority).reverse()).shift():strategy===Bottleneck.prototype.strategy.OVERFLOW_PRIORITY?_this5._getFirst(_this5._queues.slice(options.priority+1).reverse()).shift():strategy===Bottleneck.prototype.strategy.OVERFLOW?job:void 0;if(shifted!=null){_this5.Events.trigger("dropped",[shifted])}if(shifted==null||strategy===Bottleneck.prototype.strategy.OVERFLOW){return reachedHWM}}_this5._queues[options.priority].push(job);yield _this5._drainAll();return reachedHWM}))}schedule(...args){var options,task,wrapped;if(typeof args[0]==="function"){var _args=args;var _args2=_toArray(_args);task=_args2[0];args=_args2.slice(1);options=this.jobDefaults}else{var _args3=args;var _args4=_toArray(_args3);options=_args4[0];task=_args4[1];args=_args4.slice(2);options=parser.load(options,this.jobDefaults)}wrapped=function wrapped(...args){var _ref9,_ref10,_splice$call5,_splice$call6;var cb,ref;ref=args,_ref9=ref,_ref10=_toArray(_ref9),args=_ref10.slice(0),_ref9,_splice$call5=splice.call(args,-1),_splice$call6=_slicedToArray(_splice$call5,1),cb=_splice$call6[0],_splice$call5;return task.apply({},args).then(function(...args){return cb.apply({},Array.prototype.concat(null,args))}).catch(function(...args){return cb.apply({},args)})};return new this.Promise((resolve,reject)=>{return this.submit.apply({},Array.prototype.concat(options,wrapped,args,function(...args){return(args[0]!=null?reject:(args.shift(),resolve)).apply({},args)})).catch(e=>{return this.Events.trigger("error",[e])})})}wrap(fn){return(...args)=>{return this.schedule.apply({},Array.prototype.concat(fn,args))}}updateSettings(options={}){var _this6=this;return _asyncToGenerator(function*(){yield _this6._store.__updateSettings__(parser.overwrite(options,_this6.storeDefaults));parser.overwrite(options,_this6.instanceDefaults,_this6);_this6._drainAll().catch(function(e){return _this6.Events.trigger("error",[e])});return _this6})()}currentReservoir(){var _this7=this;return _asyncToGenerator(function*(){return yield _this7._store.__currentReservoir__()})()}incrementReservoir(incr=0){var _this8=this;return _asyncToGenerator(function*(){yield _this8._store.__incrementReservoir__(incr);_this8._drainAll().catch(function(e){return _this8.Events.trigger("error",[e])});return _this8})()}}Bottleneck.default=Bottleneck;Bottleneck.version=Bottleneck.prototype.version=packagejson.version;Bottleneck.strategy=Bottleneck.prototype.strategy={LEAK:1,OVERFLOW:2,OVERFLOW_PRIORITY:4,BLOCK:3};Bottleneck.BottleneckError=Bottleneck.prototype.BottleneckError=require("./BottleneckError");Bottleneck.Group=Bottleneck.prototype.Group=require("./Group");Bottleneck.prototype.jobDefaults={priority:DEFAULT_PRIORITY,weight:1,expiration:null,id:""};Bottleneck.prototype.storeDefaults={maxConcurrent:null,minTime:0,highWater:null,strategy:Bottleneck.prototype.strategy.LEAK,penalty:null,reservoir:null};Bottleneck.prototype.storeInstanceDefaults={clientOptions:{},clearDatastore:false,Promise:Promise,_groupTimeout:null};Bottleneck.prototype.instanceDefaults={datastore:"local",id:"",rejectOnDrop:true,Promise:Promise};return Bottleneck}.call(this);module.exports=Bottleneck}).call(undefined)},{"../package.json":12,"./BottleneckError":2,"./DLList":3,"./Events":4,"./Group":5,"./Local":6,"./RedisStorage":7,"./Sync":8,"./parser":11}],2:[function(require,module,exports){"use strict";(function(){var BottleneckError;BottleneckError=class BottleneckError extends Error{};module.exports=BottleneckError}).call(undefined)},{}],3:[function(require,module,exports){"use strict";(function(){var DLList;DLList=class DLList{constructor(){this._first=null;this._last=null;this.length=0}push(value){var node;this.length++;node={value:value,next:null};if(this._last!=null){this._last.next=node;this._last=node}else{this._first=this._last=node}return void 0}shift(){var ref1,value;if(this._first==null){return void 0}else{this.length--}value=this._first.value;this._first=(ref1=this._first.next)!=null?ref1:this._last=null;return value}first(){if(this._first!=null){return this._first.value}}getArray(){var node,ref,results;node=this._first;results=[];while(node!=null){results.push((ref=node,node=node.next,ref.value))}return results}};module.exports=DLList}).call(undefined)},{}],4:[function(require,module,exports){"use strict";(function(){var BottleneckError,Events;BottleneckError=require("./BottleneckError");Events=class Events{constructor(instance){this.instance=instance;this._events={};this.instance.on=((name,cb)=>{return this._addListener(name,"many",cb)});this.instance.once=((name,cb)=>{return this._addListener(name,"once",cb)});this.instance.removeAllListeners=((name=null)=>{if(name!=null){return delete this._events[name]}else{return this._events={}}})}_addListener(name,status,cb){var base;if((base=this._events)[name]==null){base[name]=[]}return this._events[name].push({cb:cb,status:status})}trigger(name,args){if(name!=="debug"){this.trigger("debug",[`Event triggered: ${name}`,args])}if(name==="dropped"&&this.instance.rejectOnDrop){args.forEach(function(job){return job.cb.apply({},[new BottleneckError("This job has been dropped by Bottleneck")])})}if(this._events[name]==null){return}this._events[name]=this._events[name].filter(function(listener){return listener.status!=="none"});return this._events[name].forEach(listener=>{var e,ret;if(listener.status==="none"){return}if(listener.status==="once"){listener.status="none"}try{ret=listener.cb.apply({},args);if(typeof(ret!=null?ret.then:void 0)==="function"){return ret.then(function(){}).catch(e=>{return this.trigger("error",[e])})}}catch(error){e=error;if("name"!=="error"){return this.trigger("error",[e])}}})}};module.exports=Events}).call(undefined)},{"./BottleneckError":2}],5:[function(require,module,exports){"use strict";function _asyncToGenerator(fn){return function(){var gen=fn.apply(this,arguments);return new Promise(function(resolve,reject){function step(key,arg){try{var info=gen[key](arg);var value=info.value}catch(error){reject(error);return}if(info.done){resolve(value)}else{return Promise.resolve(value).then(function(value){step("next",value)},function(err){step("throw",err)})}}return step("next")})}}(function(){var Events,Group,parser;parser=require("./parser");Events=require("./Events");Group=function(){class Group{constructor(limiterOptions={},groupOptions={}){this.key=this.key.bind(this);this.deleteKey=this.deleteKey.bind(this);this.limiters=this.limiters.bind(this);this.keys=this.keys.bind(this);this._startAutoCleanup=this._startAutoCleanup.bind(this);this.updateSettings=this.updateSettings.bind(this);this.limiterOptions=limiterOptions;parser.load(groupOptions,this.defaults,this);this.Events=new Events(this);this.instances={};this.Bottleneck=require("./Bottleneck");this._startAutoCleanup()}key(key=""){var ref;return(ref=this.instances[key])!=null?ref:(()=>{var limiter;limiter=this.instances[key]=new this.Bottleneck(Object.assign(this.limiterOptions,{id:`group-key-${key}`,_groupTimeout:this.timeout}));this.Events.trigger("created",[limiter,key]);return limiter})()}deleteKey(key=""){var ref;if((ref=this.instances[key])!=null){ref.disconnect()}return delete this.instances[key]}limiters(){var k,ref,results,v;ref=this.instances;results=[];for(k in ref){v=ref[k];results.push({key:k,limiter:v})}return results}keys(){return Object.keys(this.instances)}_startAutoCleanup(){var _this=this;var base;clearInterval(this.interval);return typeof(base=this.interval=setInterval(_asyncToGenerator(function*(){var e,k,ref,results,time,v;time=Date.now();ref=_this.instances;results=[];for(k in ref){v=ref[k];try{if(yield v._store.__groupCheck__(time)){results.push(_this.deleteKey(k))}else{results.push(void 0)}}catch(error){e=error;results.push(v.Events.trigger("error",[e]))}}return results}),this.timeout/2)).unref==="function"?base.unref():void 0}updateSettings(options={}){parser.overwrite(options,this.defaults,this);if(options.timeout!=null){return this._startAutoCleanup()}}}Group.prototype.defaults={timeout:1e3*60*5};return Group}.call(this);module.exports=Group}).call(undefined)},{"./Bottleneck":1,"./Events":4,"./parser":11}],6:[function(require,module,exports){"use strict";function _asyncToGenerator(fn){return function(){var gen=fn.apply(this,arguments);return new Promise(function(resolve,reject){function step(key,arg){try{var info=gen[key](arg);var value=info.value}catch(error){reject(error);return}if(info.done){resolve(value)}else{return Promise.resolve(value).then(function(value){step("next",value)},function(err){step("throw",err)})}}return step("next")})}}(function(){var BottleneckError,DLList,Local,parser;parser=require("./parser");DLList=require("./DLList");BottleneckError=require("./BottleneckError");Local=class Local{constructor(options){parser.load(options,options,this);this._nextRequest=Date.now();this._running=0;this._executing={};this._unblockTime=0;this.ready=this.yieldLoop();this.clients={}}disconnect(flush){return this}yieldLoop(t=0){return new this.Promise(function(resolve,reject){return setTimeout(resolve,t)})}computePenalty(){var ref;return(ref=this.penalty)!=null?ref:15*this.minTime||5e3}__updateSettings__(options){var _this=this;return _asyncToGenerator(function*(){yield _this.yieldLoop();parser.overwrite(options,options,_this);return true})()}__running__(){var _this2=this;return _asyncToGenerator(function*(){yield _this2.yieldLoop();return _this2._running})()}__groupCheck__(time){var _this3=this;return _asyncToGenerator(function*(){yield _this3.yieldLoop();return _this3._nextRequest+_this3._groupTimeout=0)}__incrementReservoir__(incr){var _this4=this;return _asyncToGenerator(function*(){yield _this4.yieldLoop();return _this4.reservoir+=incr})()}__currentReservoir__(){var _this5=this;return _asyncToGenerator(function*(){yield _this5.yieldLoop();return _this5.reservoir})()}isBlocked(now){return this._unblockTime>=now}check(weight,now){return this.conditionsCheck(weight)&&this._nextRequest-now<=0}__check__(weight){var _this6=this;return _asyncToGenerator(function*(){var now;yield _this6.yieldLoop();now=Date.now();return _this6.check(weight,now)})()}__register__(index,weight,expiration){var _this7=this;return _asyncToGenerator(function*(){var now,wait;yield _this7.yieldLoop();now=Date.now();if(_this7.conditionsCheck(weight)){_this7._running+=weight;_this7._executing[index]={timeout:expiration!=null?setTimeout(function(){if(!_this7._executing[index].freed){_this7._executing[index].freed=true;return _this7._running-=weight}},expiration):void 0,freed:false};if(_this7.reservoir!=null){_this7.reservoir-=weight}wait=Math.max(_this7._nextRequest-now,0);_this7._nextRequest=now+wait+_this7.minTime;return{success:true,wait:wait,reservoir:_this7.reservoir}}else{return{success:false}}})()}strategyIsBlock(){return this.strategy===3}__submit__(queueLength,weight){var _this8=this;return _asyncToGenerator(function*(){var blocked,now,reachedHWM;yield _this8.yieldLoop();if(_this8.maxConcurrent!=null&&weight>_this8.maxConcurrent){throw new BottleneckError(`Impossible to add a job having a weight of ${weight} to a limiter having a maxConcurrent setting of ${_this8.maxConcurrent}`)}now=Date.now();reachedHWM=_this8.highWater!=null&&queueLength===_this8.highWater&&!_this8.check(weight,now);blocked=_this8.strategyIsBlock()&&(reachedHWM||_this8.isBlocked(now));if(blocked){_this8._unblockTime=now+_this8.computePenalty();_this8._nextRequest=_this8._unblockTime+_this8.minTime}return{reachedHWM:reachedHWM,blocked:blocked,strategy:_this8.strategy}})()}__free__(index,weight){var _this9=this;return _asyncToGenerator(function*(){yield _this9.yieldLoop();clearTimeout(_this9._executing[index].timeout);if(!_this9._executing[index].freed){_this9._executing[index].freed=true;_this9._running-=weight}return{running:_this9._running}})()}};module.exports=Local}).call(undefined)},{"./BottleneckError":2,"./DLList":3,"./parser":11}],7:[function(require,module,exports){"use strict";var _slicedToArray=function(){function sliceIterator(arr,i){var _arr=[];var _n=true;var _d=false;var _e=undefined;try{for(var _i=arr[Symbol.iterator](),_s;!(_n=(_s=_i.next()).done);_n=true){_arr.push(_s.value);if(i&&_arr.length===i)break}}catch(err){_d=true;_e=err}finally{try{if(!_n&&_i["return"])_i["return"]()}finally{if(_d)throw _e}}return _arr}return function(arr,i){if(Array.isArray(arr)){return arr}else if(Symbol.iterator in Object(arr)){return sliceIterator(arr,i)}else{throw new TypeError("Invalid attempt to destructure non-iterable instance")}}}();function _asyncToGenerator(fn){return function(){var gen=fn.apply(this,arguments);return new Promise(function(resolve,reject){function step(key,arg){try{var info=gen[key](arg);var value=info.value}catch(error){reject(error);return}if(info.done){resolve(value)}else{return Promise.resolve(value).then(function(value){step("next",value)},function(err){step("throw",err)})}}return step("next")})}}(function(){var BottleneckError,DLList,RedisStorage,libraries,lua,parser,scriptTemplates;parser=require("./parser");DLList=require("./DLList");BottleneckError=require("./BottleneckError");lua=require("./lua.json");libraries={get_time:lua["get_time.lua"],refresh_running:lua["refresh_running.lua"],conditions_check:lua["conditions_check.lua"],refresh_expiration:lua["refresh_expiration.lua"],validate_keys:lua["validate_keys.lua"]};scriptTemplates=function scriptTemplates(id){return{init:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["refresh_expiration"],code:lua["init.lua"]},update_settings:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["validate_keys","refresh_expiration"],code:lua["update_settings.lua"]},running:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["validate_keys","refresh_running"],code:lua["running.lua"]},group_check:{keys:[`b_${id}_settings`],libs:[],code:lua["group_check.lua"]},check:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["validate_keys","refresh_running","conditions_check"],code:lua["check.lua"]},submit:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["validate_keys","refresh_running","conditions_check","refresh_expiration"],code:lua["submit.lua"]},register:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["validate_keys","refresh_running","conditions_check","refresh_expiration"],code:lua["register.lua"]},free:{keys:[`b_${id}_settings`,`b_${id}_running`,`b_${id}_executing`],libs:["validate_keys","refresh_running"],code:lua["free.lua"]},current_reservoir:{keys:[`b_${id}_settings`],libs:["validate_keys"],code:lua["current_reservoir.lua"]},increment_reservoir:{keys:[`b_${id}_settings`],libs:["validate_keys"],code:lua["increment_reservoir.lua"]}}};RedisStorage=class RedisStorage{constructor(instance,initSettings,options){var r,redis;this.loadAll=this.loadAll.bind(this);this.instance=instance;r=require;redis=r(function(){return["r","e","d","i","s"].join("")}());this.scripts=scriptTemplates(this.instance.id);parser.load(options,options,this);this.client=redis.createClient(this.clientOptions);this.subClient=redis.createClient(this.clientOptions);this.shas={};this.clients={client:this.client,subscriber:this.subClient};this.isReady=false;this.ready=new this.Promise((resolve,reject)=>{var count,done,errorListener;errorListener=function errorListener(e){return reject(e)};count=0;done=(()=>{count++;if(count===2){[this.client,this.subClient].forEach(client=>{client.removeListener("error",errorListener);return client.on("error",e=>{return this.instance.Events.trigger("error",[e])})});return resolve()}});this.client.on("error",errorListener);this.client.on("ready",function(){return done()});this.subClient.on("error",errorListener);return this.subClient.on("ready",()=>{this.subClient.on("subscribe",function(){return done()});return this.subClient.subscribe("bottleneck")})}).then(this.loadAll).then(()=>{var args;this.subClient.on("message",(channel,message)=>{var info,type;var _message$split=message.split(":");var _message$split2=_slicedToArray(_message$split,2);type=_message$split2[0];info=_message$split2[1];if(type==="freed"){return this.instance._drainAll(~~info)}});initSettings.nextRequest=Date.now();initSettings.running=0;initSettings.unblockTime=0;initSettings.version=this.instance.version;initSettings.groupTimeout=this._groupTimeout;args=this.prepareObject(initSettings);args.unshift(options.clearDatastore?1:0);this.isReady=true;return this.runScript("init",args)}).then(results=>{return this.clients})}disconnect(flush){this.client.end(flush);this.subClient.end(flush);return this}loadScript(name){return new this.Promise((resolve,reject)=>{var payload;payload=this.scripts[name].libs.map(function(lib){return libraries[lib]}).join("\n")+this.scripts[name].code;return this.client.multi([["script","load",payload]]).exec((err,replies)=>{if(err!=null){return reject(err)}this.shas[name]=replies[0];return resolve(replies[0])})})}loadAll(){var k,v;return this.Promise.all(function(){var ref,results1;ref=this.scripts;results1=[];for(k in ref){v=ref[k];results1.push(this.loadScript(k))}return results1}.call(this))}prepareArray(arr){return arr.map(function(x){if(x!=null){return x.toString()}else{return""}})}prepareObject(obj){var arr,k,v;arr=[];for(k in obj){v=obj[k];arr.push(k,v!=null?v.toString():"")}return arr}runScript(name,args){var script;if(!this.isReady){return this.Promise.reject(new BottleneckError("This limiter is not done connecting to Redis yet. Wait for the 'ready' event to be triggered before submitting requests."))}else{script=this.scripts[name];return new this.Promise((resolve,reject)=>{var arr;arr=[this.shas[name],script.keys.length].concat(script.keys,args,function(err,replies){if(err!=null){return reject(err)}return resolve(replies)});this.instance.Events.trigger("debug",[`Calling Redis script: ${name}.lua`,args]);return this.client.evalsha.bind(this.client).apply({},arr)}).catch(e=>{if(e.message==="SETTINGS_KEY_NOT_FOUND"){return this.Promise.reject(new BottleneckError(`Bottleneck limiter (id: '${this.instance.id}') could not find the Redis key it needs to complete this action (key '${script.keys[0]}'), was it deleted?${this._groupTimeout!=null?" Note: This limiter is in a Group, it could have been garbage collected.":""}`))}else{return this.Promise.reject(e)}})}}convertBool(b){return!!b}__updateSettings__(options){var _this=this;return _asyncToGenerator(function*(){return yield _this.runScript("update_settings",_this.prepareObject(options))})()}__running__(){var _this2=this;return _asyncToGenerator(function*(){return yield _this2.runScript("running",[Date.now()])})()}__groupCheck__(){var _this3=this;return _asyncToGenerator(function*(){return _this3.convertBool(yield _this3.runScript("group_check",[]))})()}__incrementReservoir__(incr){var _this4=this;return _asyncToGenerator(function*(){return yield _this4.runScript("increment_reservoir",[incr])})()}__currentReservoir__(){var _this5=this;return _asyncToGenerator(function*(){return yield _this5.runScript("current_reservoir",[])})()}__check__(weight){var _this6=this;return _asyncToGenerator(function*(){return _this6.convertBool(yield _this6.runScript("check",_this6.prepareArray([weight,Date.now()])))})()}__register__(index,weight,expiration){var _this7=this;return _asyncToGenerator(function*(){var reservoir,success,wait;var _ref=yield _this7.runScript("register",_this7.prepareArray([index,weight,expiration,Date.now()]));var _ref2=_slicedToArray(_ref,3);success=_ref2[0];wait=_ref2[1];reservoir=_ref2[2];return{success:_this7.convertBool(success),wait:wait,reservoir:reservoir}})()}__submit__(queueLength,weight){var _this8=this;return _asyncToGenerator(function*(){var blocked,e,maxConcurrent,overweight,reachedHWM,strategy;try{var _ref3=yield _this8.runScript("submit",_this8.prepareArray([queueLength,weight,Date.now()]));var _ref4=_slicedToArray(_ref3,3);reachedHWM=_ref4[0];blocked=_ref4[1];strategy=_ref4[2];return{reachedHWM:_this8.convertBool(reachedHWM),blocked:_this8.convertBool(blocked),strategy:strategy}}catch(error){e=error;if(e.message.indexOf("OVERWEIGHT")===0){var _e$message$split=e.message.split(":");var _e$message$split2=_slicedToArray(_e$message$split,3);overweight=_e$message$split2[0];weight=_e$message$split2[1];maxConcurrent=_e$message$split2[2];throw new BottleneckError(`Impossible to add a job having a weight of ${weight} to a limiter having a maxConcurrent setting of ${maxConcurrent}`)}else{throw e}}})()}__free__(index,weight){var _this9=this;return _asyncToGenerator(function*(){var result;result=yield _this9.runScript("free",_this9.prepareArray([index,Date.now()]));return{running:result}})()}};module.exports=RedisStorage}).call(undefined)},{"./BottleneckError":2,"./DLList":3,"./lua.json":10,"./parser":11}],8:[function(require,module,exports){"use strict";var _slicedToArray=function(){function sliceIterator(arr,i){var _arr=[];var _n=true;var _d=false;var _e=undefined;try{for(var _i=arr[Symbol.iterator](),_s;!(_n=(_s=_i.next()).done);_n=true){_arr.push(_s.value);if(i&&_arr.length===i)break}}catch(err){_d=true;_e=err}finally{try{if(!_n&&_i["return"])_i["return"]()}finally{if(_d)throw _e}}return _arr}return function(arr,i){if(Array.isArray(arr)){return arr}else if(Symbol.iterator in Object(arr)){return sliceIterator(arr,i)}else{throw new TypeError("Invalid attempt to destructure non-iterable instance")}}}();function _toArray(arr){return Array.isArray(arr)?arr:Array.from(arr)}(function(){var DLList,Sync,splice=[].splice;DLList=require("./DLList");Sync=class Sync{constructor(name){this.submit=this.submit.bind(this);this.schedule=this.schedule.bind(this);this.name=name;this._running=0;this._queue=new DLList}isEmpty(){return this._queue.length===0}_tryToRun(){var next;if(this._running<1&&this._queue.length>0){this._running++;next=this._queue.shift();return next.task.apply({},next.args.concat((...args)=>{var ref;this._running--;this._tryToRun();return(ref=next.cb)!=null?ref.apply({},args):void 0}))}}submit(task,...args){var _ref,_ref2,_splice$call,_splice$call2;var cb,ref;ref=args,_ref=ref,_ref2=_toArray(_ref),args=_ref2.slice(0),_ref,_splice$call=splice.call(args,-1),_splice$call2=_slicedToArray(_splice$call,1),cb=_splice$call2[0],_splice$call;this._queue.push({task:task,args:args,cb:cb});return this._tryToRun()}schedule(task,...args){var wrapped;wrapped=function wrapped(...args){var _ref3,_ref4,_splice$call3,_splice$call4;var cb,ref;ref=args,_ref3=ref,_ref4=_toArray(_ref3),args=_ref4.slice(0),_ref3,_splice$call3=splice.call(args,-1),_splice$call4=_slicedToArray(_splice$call3,1),cb=_splice$call4[0],_splice$call3;return task.apply({},args).then(function(...args){return cb.apply({},Array.prototype.concat(null,args))}).catch(function(...args){return cb.apply({},args)})};return new Promise((resolve,reject)=>{return this.submit.apply({},Array.prototype.concat(wrapped,args,function(...args){return(args[0]!=null?reject:(args.shift(),resolve)).apply({},args)}))})}};module.exports=Sync}).call(undefined)},{"./DLList":3}],9:[function(require,module,exports){"use strict";(function(){module.exports=require("./Bottleneck")}).call(undefined)},{"./Bottleneck":1}],10:[function(require,module,exports){module.exports={"check.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal weight = tonumber(ARGV[1])\nlocal now = tonumber(ARGV[2])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\n\nlocal conditionsCheck = conditions_check(weight, maxConcurrent, running, reservoir)\n\nlocal result = conditionsCheck and nextRequest - now <= 0\n\nreturn result\n","conditions_check.lua":"local conditions_check = function (weight, maxConcurrent, running, reservoir)\n return (\n (maxConcurrent == nil or running + weight <= maxConcurrent) and\n (reservoir == nil or reservoir - weight >= 0)\n )\nend\n","current_reservoir.lua":"local settings_key = KEYS[1]\n\nreturn tonumber(redis.call('hget', settings_key, 'reservoir'))\n","free.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal now = ARGV[2]\n\nredis.call('zadd', executing_key, 0, index)\n\nreturn refresh_running(executing_key, running_key, settings_key, now)\n","get_time.lua":"redis.replicate_commands()\n\nlocal get_time = function ()\n local time = redis.call('time')\n\n return tonumber(time[1]..string.sub(time[2], 1, 3))\nend\n","group_check.lua":"local settings_key = KEYS[1]\n\nreturn not (redis.call('exists', settings_key) == 1)\n","increment_reservoir.lua":"local settings_key = KEYS[1]\nlocal incr = ARGV[1]\n\nreturn redis.call('hincrby', settings_key, 'reservoir', incr)\n","init.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal clear = tonumber(ARGV[1])\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n local args = {'hmset', settings_key}\n\n for i = 2, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n","refresh_expiration.lua":"local refresh_expiration = function (executing_key, running_key, settings_key, now, nextRequest, groupTimeout)\n\n if groupTimeout ~= nil then\n local ttl = (nextRequest + groupTimeout) - now\n\n redis.call('pexpire', executing_key, ttl)\n redis.call('pexpire', running_key, ttl)\n redis.call('pexpire', settings_key, ttl)\n end\n\nend\n","refresh_running.lua":"local refresh_running = function (executing_key, running_key, settings_key, now)\n\n local expired = redis.call('zrangebyscore', executing_key, '-inf', '('..now)\n\n if #expired == 0 then\n return redis.call('hget', settings_key, 'running')\n else\n redis.call('zremrangebyscore', executing_key, '-inf', '('..now)\n\n local args = {'hmget', running_key}\n for i = 1, #expired do\n table.insert(args, expired[i])\n end\n\n local weights = redis.call(unpack(args))\n\n args[1] = 'hdel'\n local deleted = redis.call(unpack(args))\n\n local total = 0\n for i = 1, #weights do\n total = total + (tonumber(weights[i]) or 0)\n end\n local incr = -total\n if total == 0 then\n incr = 0\n else\n redis.call('publish', 'bottleneck', 'freed:'..total)\n end\n\n return redis.call('hincrby', settings_key, 'running', incr)\n end\n\nend\n","register.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal weight = tonumber(ARGV[2])\nlocal expiration = tonumber(ARGV[3])\nlocal now = tonumber(ARGV[4])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest',\n 'minTime',\n 'groupTimeout'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\nlocal minTime = tonumber(settings[4])\nlocal groupTimeout = tonumber(settings[5])\n\nif conditions_check(weight, maxConcurrent, running, reservoir) then\n\n if expiration ~= nil then\n redis.call('zadd', executing_key, now + expiration, index)\n end\n redis.call('hset', running_key, index, weight)\n redis.call('hincrby', settings_key, 'running', weight)\n\n local wait = math.max(nextRequest - now, 0)\n local newNextRequest = now + wait + minTime\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', newNextRequest\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', newNextRequest\n )\n end\n\n refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout)\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n","running.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\nlocal now = ARGV[1]\n\nreturn tonumber(refresh_running(executing_key, running_key, settings_key, now))\n","submit.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal queueLength = tonumber(ARGV[1])\nlocal weight = tonumber(ARGV[2])\nlocal now = tonumber(ARGV[3])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'highWater',\n 'reservoir',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime',\n 'groupTimeout'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal highWater = tonumber(settings[2])\nlocal reservoir = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\nlocal groupTimeout = tonumber(settings[9])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(weight, maxConcurrent, running, reservoir)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n local newNextRequest = unblockTime + minTime\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', newNextRequest\n )\n\n refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout)\nend\n\nreturn {reachedHWM, blocked, strategy}\n","update_settings.lua":"local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal args = {'hmset', settings_key}\n\nfor i = 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n","validate_keys.lua":"local settings_key = KEYS[1]\n\nif not (redis.call('exists', settings_key) == 1) then\n return redis.error_reply('SETTINGS_KEY_NOT_FOUND')\nend\n"}},{}],11:[function(require,module,exports){"use strict";(function(){exports.load=function(received,defaults,onto={}){var k,ref,v;for(k in defaults){v=defaults[k];onto[k]=(ref=received[k])!=null?ref:v}return onto};exports.overwrite=function(received,defaults,onto={}){var k,v;for(k in received){v=received[k];if(defaults[k]!==void 0){onto[k]=v}}return onto}}).call(undefined)},{}],12:[function(require,module,exports){module.exports={name:"bottleneck",version:"2.1.0",description:"Distributed task scheduler and rate limiter",main:"lib/index.js",typings:"bottleneck.d.ts",scripts:{test:"./node_modules/mocha/bin/mocha test",build:"./scripts/build.sh",compile:"./scripts/build.sh compile"},repository:{type:"git",url:"https://github.com/SGrondin/bottleneck"},keywords:["async rate limiter","rate limiter","rate limiting","async","rate","limiting","limiter","throttle","throttling","load","ddos"],author:{name:"Simon Grondin"},license:"MIT",bugs:{url:"https://github.com/SGrondin/bottleneck/issues"},devDependencies:{"@types/es6-promise":"0.0.33",assert:"1.4.x","babel-core":"^6.26.0","babel-preset-env":"^1.6.1",browserify:"*",coffeescript:"2.2.x","ejs-cli":"2.0.1",mocha:"4.x",redis:"^2.8.0",typescript:"^2.6.2","uglify-es":"3.x"}}},{}]},{},[9]); \ No newline at end of file diff --git a/lib/Bottleneck.js b/lib/Bottleneck.js index bfd2f6f..dbe0612 100644 --- a/lib/Bottleneck.js +++ b/lib/Bottleneck.js @@ -463,7 +463,8 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a Bottleneck.prototype.storeInstanceDefaults = { clientOptions: {}, clearDatastore: false, - Promise: Promise + Promise: Promise, + _groupTimeout: null }; Bottleneck.prototype.instanceDefaults = { diff --git a/lib/Group.js b/lib/Group.js index b190446..47916d4 100644 --- a/lib/Group.js +++ b/lib/Group.js @@ -4,14 +4,12 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a // Generated by CoffeeScript 2.2.2 (function () { - var BottleneckError, Events, Group, parser; + var Events, Group, parser; parser = require("./parser"); Events = require("./Events"); - BottleneckError = require("./BottleneckError"); - Group = function () { class Group { constructor(limiterOptions = {}, groupOptions = {}) { @@ -19,25 +17,24 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a this.deleteKey = this.deleteKey.bind(this); this.limiters = this.limiters.bind(this); this.keys = this.keys.bind(this); - this.startAutoCleanup = this.startAutoCleanup.bind(this); - this.stopAutoCleanup = this.stopAutoCleanup.bind(this); + this._startAutoCleanup = this._startAutoCleanup.bind(this); this.updateSettings = this.updateSettings.bind(this); this.limiterOptions = limiterOptions; - if (this.limiterOptions.datastore === "redis") { - throw new BottleneckError("Groups do not currently support Clustering. This will be implemented in a future version. Please open an issue at https://github.com/SGrondin/bottleneck/issues if you would like this feature to be implemented."); - } parser.load(groupOptions, this.defaults, this); this.Events = new Events(this); this.instances = {}; this.Bottleneck = require("./Bottleneck"); - this.startAutoCleanup(); + this._startAutoCleanup(); } key(key = "") { var ref; return (ref = this.instances[key]) != null ? ref : (() => { var limiter; - limiter = this.instances[key] = new this.Bottleneck(this.limiterOptions); + limiter = this.instances[key] = new this.Bottleneck(Object.assign(this.limiterOptions, { + id: `group-key-${key}`, + _groupTimeout: this.timeout + })); this.Events.trigger("created", [limiter, key]); return limiter; })(); @@ -69,42 +66,37 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a return Object.keys(this.instances); } - startAutoCleanup() { + _startAutoCleanup() { var _this = this; var base; - this.stopAutoCleanup(); + clearInterval(this.interval); return typeof (base = this.interval = setInterval(_asyncToGenerator(function* () { - var check, e, k, ref, results, time, v; + var e, k, ref, results, time, v; time = Date.now(); ref = _this.instances; results = []; for (k in ref) { v = ref[k]; try { - check = yield v._store.__groupCheck__(); - if (check + _this.timeout < time) { + if (yield v._store.__groupCheck__(time)) { results.push(_this.deleteKey(k)); } else { results.push(void 0); } } catch (error) { e = error; - results.push(v._trigger("error", [e])); + results.push(v.Events.trigger("error", [e])); } } return results; }), this.timeout / 2)).unref === "function" ? base.unref() : void 0; } - stopAutoCleanup() { - return clearInterval(this.interval); - } - updateSettings(options = {}) { parser.overwrite(options, this.defaults, this); if (options.timeout != null) { - return this.startAutoCleanup(); + return this._startAutoCleanup(); } } diff --git a/lib/Local.js b/lib/Local.js index a82d421..235711c 100644 --- a/lib/Local.js +++ b/lib/Local.js @@ -57,12 +57,12 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a })(); } - __groupCheck__() { + __groupCheck__(time) { var _this3 = this; return _asyncToGenerator(function* () { yield _this3.yieldLoop(); - return _this3._nextRequest; + return _this3._nextRequest + _this3._groupTimeout < time; })(); } diff --git a/lib/RedisStorage.js b/lib/RedisStorage.js index 6c05b9a..f2641e1 100644 --- a/lib/RedisStorage.js +++ b/lib/RedisStorage.js @@ -6,7 +6,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a // Generated by CoffeeScript 2.2.2 (function () { - var BottleneckError, DLList, RedisStorage, libraries, lua, parser, scripts; + var BottleneckError, DLList, RedisStorage, libraries, lua, parser, scriptTemplates; parser = require("./parser"); @@ -19,60 +19,64 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a libraries = { get_time: lua["get_time.lua"], refresh_running: lua["refresh_running.lua"], - conditions_check: lua["conditions_check.lua"] + conditions_check: lua["conditions_check.lua"], + refresh_expiration: lua["refresh_expiration.lua"], + validate_keys: lua["validate_keys.lua"] }; - scripts = { - init: { - keys: ["b_settings", "b_running", "b_executing"], - libs: [], - code: lua["init.lua"] - }, - update_settings: { - keys: ["b_settings"], - libs: [], - code: lua["update_settings.lua"] - }, - running: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running"], - code: lua["running.lua"] - }, - group_check: { - keys: ["b_settings"], - libs: [], - code: lua["group_check.lua"] - }, - check: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running", "conditions_check"], - code: lua["check.lua"] - }, - submit: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running", "conditions_check"], - code: lua["submit.lua"] - }, - register: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running", "conditions_check"], - code: lua["register.lua"] - }, - free: { - keys: ["b_settings", "b_running", "b_executing"], - libs: ["refresh_running"], - code: lua["free.lua"] - }, - current_reservoir: { - keys: ["b_settings"], - libs: [], - code: lua["current_reservoir.lua"] - }, - increment_reservoir: { - keys: ["b_settings"], - libs: [], - code: lua["increment_reservoir.lua"] - } + scriptTemplates = function scriptTemplates(id) { + return { + init: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["refresh_expiration"], + code: lua["init.lua"] + }, + update_settings: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_expiration"], + code: lua["update_settings.lua"] + }, + running: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running"], + code: lua["running.lua"] + }, + group_check: { + keys: [`b_${id}_settings`], + libs: [], + code: lua["group_check.lua"] + }, + check: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running", "conditions_check"], + code: lua["check.lua"] + }, + submit: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running", "conditions_check", "refresh_expiration"], + code: lua["submit.lua"] + }, + register: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running", "conditions_check", "refresh_expiration"], + code: lua["register.lua"] + }, + free: { + keys: [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`], + libs: ["validate_keys", "refresh_running"], + code: lua["free.lua"] + }, + current_reservoir: { + keys: [`b_${id}_settings`], + libs: ["validate_keys"], + code: lua["current_reservoir.lua"] + }, + increment_reservoir: { + keys: [`b_${id}_settings`], + libs: ["validate_keys"], + code: lua["increment_reservoir.lua"] + } + }; }; RedisStorage = class RedisStorage { @@ -84,6 +88,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a redis = r(function () { return ["r", "e", "d", "i", "s"].join(""); // Obfuscated or else Webpack/Angular will try to inline the optional redis module }()); + this.scripts = scriptTemplates(this.instance.id); parser.load(options, options, this); this.client = redis.createClient(this.clientOptions); this.subClient = redis.createClient(this.clientOptions); @@ -142,6 +147,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a initSettings.running = 0; initSettings.unblockTime = 0; initSettings.version = this.instance.version; + initSettings.groupTimeout = this._groupTimeout; args = this.prepareObject(initSettings); args.unshift(options.clearDatastore ? 1 : 0); this.isReady = true; @@ -160,9 +166,9 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a loadScript(name) { return new this.Promise((resolve, reject) => { var payload; - payload = scripts[name].libs.map(function (lib) { + payload = this.scripts[name].libs.map(function (lib) { return libraries[lib]; - }).join("\n") + scripts[name].code; + }).join("\n") + this.scripts[name].code; return this.client.multi([["script", "load", payload]]).exec((err, replies) => { if (err != null) { return reject(err); @@ -176,10 +182,11 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a loadAll() { var k, v; return this.Promise.all(function () { - var results1; + var ref, results1; + ref = this.scripts; results1 = []; - for (k in scripts) { - v = scripts[k]; + for (k in ref) { + v = ref[k]; results1.push(this.loadScript(k)); } return results1; @@ -207,12 +214,13 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a } runScript(name, args) { + var script; if (!this.isReady) { return this.Promise.reject(new BottleneckError("This limiter is not done connecting to Redis yet. Wait for the 'ready' event to be triggered before submitting requests.")); } else { + script = this.scripts[name]; return new this.Promise((resolve, reject) => { - var arr, script; - script = scripts[name]; + var arr; arr = [this.shas[name], script.keys.length].concat(script.keys, args, function (err, replies) { if (err != null) { return reject(err); @@ -221,6 +229,12 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a }); this.instance.Events.trigger("debug", [`Calling Redis script: ${name}.lua`, args]); return this.client.evalsha.bind(this.client).apply({}, arr); + }).catch(e => { + if (e.message === "SETTINGS_KEY_NOT_FOUND") { + return this.Promise.reject(new BottleneckError(`Bottleneck limiter (id: '${this.instance.id}') could not find the Redis key it needs to complete this action (key '${script.keys[0]}'), was it deleted?${this._groupTimeout != null ? ' Note: This limiter is in a Group, it could have been garbage collected.' : ''}`)); + } else { + return this.Promise.reject(e); + } }); } } @@ -249,7 +263,7 @@ function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, a var _this3 = this; return _asyncToGenerator(function* () { - return parseInt((yield _this3.runScript("group_check", [])), 10); + return _this3.convertBool((yield _this3.runScript("group_check", []))); })(); } diff --git a/lib/lua.json b/lib/lua.json index 57c1781..e2c790c 100644 --- a/lib/lua.json +++ b/lib/lua.json @@ -4,12 +4,14 @@ "current_reservoir.lua": "local settings_key = KEYS[1]\n\nreturn tonumber(redis.call('hget', settings_key, 'reservoir'))\n", "free.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal now = ARGV[2]\n\nredis.call('zadd', executing_key, 0, index)\n\nreturn refresh_running(executing_key, running_key, settings_key, now)\n", "get_time.lua": "redis.replicate_commands()\n\nlocal get_time = function ()\n local time = redis.call('time')\n\n return tonumber(time[1]..string.sub(time[2], 1, 3))\nend\n", - "group_check.lua": "local settings_key = KEYS[1]\n\nreturn redis.call('hget', settings_key, 'nextRequest')\n", + "group_check.lua": "local settings_key = KEYS[1]\n\nreturn not (redis.call('exists', settings_key) == 1)\n", "increment_reservoir.lua": "local settings_key = KEYS[1]\nlocal incr = ARGV[1]\n\nreturn redis.call('hincrby', settings_key, 'reservoir', incr)\n", - "init.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal clear = tonumber(ARGV[1])\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n local args = {'hmset', settings_key}\n\n for i = 2, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\nend\n\nreturn {}\n", + "init.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal clear = tonumber(ARGV[1])\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n local args = {'hmset', settings_key}\n\n for i = 2, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n", + "refresh_expiration.lua": "local refresh_expiration = function (executing_key, running_key, settings_key, now, nextRequest, groupTimeout)\n\n if groupTimeout ~= nil then\n local ttl = (nextRequest + groupTimeout) - now\n\n redis.call('pexpire', executing_key, ttl)\n redis.call('pexpire', running_key, ttl)\n redis.call('pexpire', settings_key, ttl)\n end\n\nend\n", "refresh_running.lua": "local refresh_running = function (executing_key, running_key, settings_key, now)\n\n local expired = redis.call('zrangebyscore', executing_key, '-inf', '('..now)\n\n if #expired == 0 then\n return redis.call('hget', settings_key, 'running')\n else\n redis.call('zremrangebyscore', executing_key, '-inf', '('..now)\n\n local args = {'hmget', running_key}\n for i = 1, #expired do\n table.insert(args, expired[i])\n end\n\n local weights = redis.call(unpack(args))\n\n args[1] = 'hdel'\n local deleted = redis.call(unpack(args))\n\n local total = 0\n for i = 1, #weights do\n total = total + (tonumber(weights[i]) or 0)\n end\n local incr = -total\n if total == 0 then\n incr = 0\n else\n redis.call('publish', 'bottleneck', 'freed:'..total)\n end\n\n return redis.call('hincrby', settings_key, 'running', incr)\n end\n\nend\n", - "register.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal weight = tonumber(ARGV[2])\nlocal expiration = tonumber(ARGV[3])\nlocal now = tonumber(ARGV[4])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest',\n 'minTime'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\nlocal minTime = tonumber(settings[4])\n\nif conditions_check(weight, maxConcurrent, running, reservoir) then\n\n if expiration ~= nil then\n redis.call('zadd', executing_key, now + expiration, index)\n end\n redis.call('hset', running_key, index, weight)\n redis.call('hincrby', settings_key, 'running', weight)\n\n local wait = math.max(nextRequest - now, 0)\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', now + wait + minTime\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', now + wait + minTime\n )\n end\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n", + "register.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal index = ARGV[1]\nlocal weight = tonumber(ARGV[2])\nlocal expiration = tonumber(ARGV[3])\nlocal now = tonumber(ARGV[4])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'reservoir',\n 'nextRequest',\n 'minTime',\n 'groupTimeout'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal reservoir = tonumber(settings[2])\nlocal nextRequest = tonumber(settings[3])\nlocal minTime = tonumber(settings[4])\nlocal groupTimeout = tonumber(settings[5])\n\nif conditions_check(weight, maxConcurrent, running, reservoir) then\n\n if expiration ~= nil then\n redis.call('zadd', executing_key, now + expiration, index)\n end\n redis.call('hset', running_key, index, weight)\n redis.call('hincrby', settings_key, 'running', weight)\n\n local wait = math.max(nextRequest - now, 0)\n local newNextRequest = now + wait + minTime\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', newNextRequest\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', newNextRequest\n )\n end\n\n refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout)\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n", "running.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\nlocal now = ARGV[1]\n\nreturn tonumber(refresh_running(executing_key, running_key, settings_key, now))\n", - "submit.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal queueLength = tonumber(ARGV[1])\nlocal weight = tonumber(ARGV[2])\nlocal now = tonumber(ARGV[3])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'highWater',\n 'reservoir',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal highWater = tonumber(settings[2])\nlocal reservoir = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(weight, maxConcurrent, running, reservoir)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', unblockTime + minTime\n )\nend\n\nreturn {reachedHWM, blocked, strategy}\n", - "update_settings.lua": "local settings_key = KEYS[1]\n\nlocal args = {'hmset', settings_key}\n\nfor i = 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nreturn {}\n" + "submit.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal queueLength = tonumber(ARGV[1])\nlocal weight = tonumber(ARGV[2])\nlocal now = tonumber(ARGV[3])\n\nlocal running = tonumber(refresh_running(executing_key, running_key, settings_key, now))\nlocal settings = redis.call('hmget', settings_key,\n 'maxConcurrent',\n 'highWater',\n 'reservoir',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime',\n 'groupTimeout'\n)\nlocal maxConcurrent = tonumber(settings[1])\nlocal highWater = tonumber(settings[2])\nlocal reservoir = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\nlocal groupTimeout = tonumber(settings[9])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(weight, maxConcurrent, running, reservoir)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n local newNextRequest = unblockTime + minTime\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', newNextRequest\n )\n\n refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout)\nend\n\nreturn {reachedHWM, blocked, strategy}\n", + "update_settings.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal args = {'hmset', settings_key}\n\nfor i = 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n", + "validate_keys.lua": "local settings_key = KEYS[1]\n\nif not (redis.call('exists', settings_key) == 1) then\n return redis.error_reply('SETTINGS_KEY_NOT_FOUND')\nend\n" } diff --git a/src/Bottleneck.coffee b/src/Bottleneck.coffee index b4e7809..abb3733 100644 --- a/src/Bottleneck.coffee +++ b/src/Bottleneck.coffee @@ -28,7 +28,8 @@ class Bottleneck storeInstanceDefaults: clientOptions: {}, clearDatastore: false, - Promise: Promise + Promise: Promise, + _groupTimeout: null instanceDefaults: datastore: "local", id: "", diff --git a/src/Group.coffee b/src/Group.coffee index e8faae2..231bfef 100644 --- a/src/Group.coffee +++ b/src/Group.coffee @@ -1,17 +1,15 @@ parser = require "./parser" Events = require "./Events" -BottleneckError = require "./BottleneckError" class Group defaults: { timeout: 1000 * 60 * 5 } constructor: (@limiterOptions={}, groupOptions={}) -> - if @limiterOptions.datastore == "redis" then throw new BottleneckError "Groups do not currently support Clustering. This will be implemented in a future version. Please open an issue at https://github.com/SGrondin/bottleneck/issues if you would like this feature to be implemented." parser.load groupOptions, @defaults, @ @Events = new Events @ @instances = {} @Bottleneck = require "./Bottleneck" - @startAutoCleanup() + @_startAutoCleanup() key: (key="") => @instances[key] ? do => - limiter = @instances[key] = new @Bottleneck @limiterOptions + limiter = @instances[key] = new @Bottleneck Object.assign @limiterOptions, { id: "group-key-#{key}", _groupTimeout: @timeout } @Events.trigger "created", [limiter, key] limiter deleteKey: (key="") => @@ -20,20 +18,16 @@ class Group limiters: => for k,v of @instances then { key: k, limiter: v } keys: => Object.keys @instances - startAutoCleanup: => - @stopAutoCleanup() + _startAutoCleanup: => + clearInterval @interval (@interval = setInterval => time = Date.now() for k,v of @instances - try - check = await v._store.__groupCheck__() - if (check + @timeout) < time then @deleteKey k - catch e - v._trigger "error", [e] + try if await v._store.__groupCheck__(time) then @deleteKey k + catch e then v.Events.trigger "error", [e] , (@timeout / 2)).unref?() - stopAutoCleanup: => clearInterval @interval updateSettings: (options={}) => parser.overwrite options, @defaults, @ - @startAutoCleanup() if options.timeout? + @_startAutoCleanup() if options.timeout? module.exports = Group diff --git a/src/Local.coffee b/src/Local.coffee index 9365397..0dd6dd9 100644 --- a/src/Local.coffee +++ b/src/Local.coffee @@ -27,9 +27,9 @@ class Local await @yieldLoop() @_running - __groupCheck__: -> + __groupCheck__: (time) -> await @yieldLoop() - @_nextRequest + (@_nextRequest + @_groupTimeout) < time conditionsCheck: (weight) -> ((not @maxConcurrent? or @_running+weight <= @maxConcurrent) and diff --git a/src/RedisStorage.coffee b/src/RedisStorage.coffee index 62a333b..a9e10cb 100644 --- a/src/RedisStorage.coffee +++ b/src/RedisStorage.coffee @@ -7,52 +7,55 @@ libraries = get_time: lua["get_time.lua"] refresh_running: lua["refresh_running.lua"] conditions_check: lua["conditions_check.lua"] -scripts = + refresh_expiration: lua["refresh_expiration.lua"] + validate_keys: lua["validate_keys.lua"] +scriptTemplates = (id) -> init: - keys: ["b_settings", "b_running", "b_executing"] - libs: [] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["refresh_expiration"] code: lua["init.lua"] update_settings: - keys: ["b_settings"] - libs: [] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["validate_keys", "refresh_expiration"] code: lua["update_settings.lua"] running: - keys: ["b_settings", "b_running", "b_executing"] - libs: ["refresh_running"] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["validate_keys", "refresh_running"] code: lua["running.lua"] group_check: - keys: ["b_settings"] + keys: ["b_#{id}_settings"] libs: [] code: lua["group_check.lua"] check: - keys: ["b_settings", "b_running", "b_executing"] - libs: ["refresh_running", "conditions_check"] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["validate_keys", "refresh_running", "conditions_check"] code: lua["check.lua"] submit: - keys: ["b_settings", "b_running", "b_executing"] - libs: ["refresh_running", "conditions_check"] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["validate_keys", "refresh_running", "conditions_check", "refresh_expiration"] code: lua["submit.lua"] register: - keys: ["b_settings", "b_running", "b_executing"] - libs: ["refresh_running", "conditions_check"] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["validate_keys", "refresh_running", "conditions_check", "refresh_expiration"] code: lua["register.lua"] free: - keys: ["b_settings", "b_running", "b_executing"] - libs: ["refresh_running"] + keys: ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: ["validate_keys", "refresh_running"] code: lua["free.lua"] current_reservoir: - keys: ["b_settings"] - libs: [] + keys: ["b_#{id}_settings"] + libs: ["validate_keys"] code: lua["current_reservoir.lua"] increment_reservoir: - keys: ["b_settings"] - libs: [] + keys: ["b_#{id}_settings"] + libs: ["validate_keys"] code: lua["increment_reservoir.lua"] class RedisStorage constructor: (@instance, initSettings, options) -> r = require redis = r do -> ["r", "e", "d", "i", "s"].join("") # Obfuscated or else Webpack/Angular will try to inline the optional redis module + @scripts = scriptTemplates @instance.id parser.load options, options, @ @client = redis.createClient @clientOptions @subClient = redis.createClient @clientOptions @@ -86,6 +89,7 @@ class RedisStorage initSettings.running = 0 initSettings.unblockTime = 0 initSettings.version = @instance.version + initSettings.groupTimeout = @_groupTimeout args = @prepareObject(initSettings) args.unshift (if options.clearDatastore then 1 else 0) @@ -101,15 +105,15 @@ class RedisStorage loadScript: (name) -> new @Promise (resolve, reject) => - payload = scripts[name].libs.map (lib) -> libraries[lib] - .join("\n") + scripts[name].code + payload = @scripts[name].libs.map (lib) -> libraries[lib] + .join("\n") + @scripts[name].code @client.multi([["script", "load", payload]]).exec (err, replies) => if err? then return reject err @shas[name] = replies[0] return resolve replies[0] - loadAll: => @Promise.all(for k, v of scripts then @loadScript k) + loadAll: => @Promise.all(for k, v of @scripts then @loadScript k) prepareArray: (arr) -> arr.map (x) -> if x? then x.toString() else "" @@ -120,13 +124,18 @@ class RedisStorage runScript: (name, args) -> if !@isReady then @Promise.reject new BottleneckError "This limiter is not done connecting to Redis yet. Wait for the 'ready' event to be triggered before submitting requests." - else new @Promise (resolve, reject) => - script = scripts[name] - arr = [@shas[name], script.keys.length].concat script.keys, args, (err, replies) -> - if err? then return reject err - return resolve replies - @instance.Events.trigger "debug", ["Calling Redis script: #{name}.lua", args] - @client.evalsha.bind(@client).apply {}, arr + else + script = @scripts[name] + new @Promise (resolve, reject) => + arr = [@shas[name], script.keys.length].concat script.keys, args, (err, replies) -> + if err? then return reject err + return resolve replies + @instance.Events.trigger "debug", ["Calling Redis script: #{name}.lua", args] + @client.evalsha.bind(@client).apply {}, arr + .catch (e) => + if e.message == "SETTINGS_KEY_NOT_FOUND" + then @Promise.reject new BottleneckError "Bottleneck limiter (id: '#{@instance.id}') could not find the Redis key it needs to complete this action (key '#{script.keys[0]}'), was it deleted?#{if @_groupTimeout? then ' Note: This limiter is in a Group, it could have been garbage collected.' else ''}" + else @Promise.reject e convertBool: (b) -> !!b @@ -134,7 +143,7 @@ class RedisStorage __running__: -> await @runScript "running", [Date.now()] - __groupCheck__: -> parseInt (await @runScript "group_check", []), 10 + __groupCheck__: -> @convertBool await @runScript "group_check", [] __incrementReservoir__: (incr) -> await @runScript "increment_reservoir", [incr] diff --git a/src/redis/group_check.lua b/src/redis/group_check.lua index 1c75625..e881389 100644 --- a/src/redis/group_check.lua +++ b/src/redis/group_check.lua @@ -1,3 +1,3 @@ local settings_key = KEYS[1] -return redis.call('hget', settings_key, 'nextRequest') +return not (redis.call('exists', settings_key) == 1) diff --git a/src/redis/init.lua b/src/redis/init.lua index beb74e1..c767ffa 100644 --- a/src/redis/init.lua +++ b/src/redis/init.lua @@ -18,4 +18,7 @@ if redis.call('exists', settings_key) == 0 then redis.call(unpack(args)) end +local groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout')) +refresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout) + return {} diff --git a/src/redis/refresh_expiration.lua b/src/redis/refresh_expiration.lua new file mode 100644 index 0000000..9e665f3 --- /dev/null +++ b/src/redis/refresh_expiration.lua @@ -0,0 +1,11 @@ +local refresh_expiration = function (executing_key, running_key, settings_key, now, nextRequest, groupTimeout) + + if groupTimeout ~= nil then + local ttl = (nextRequest + groupTimeout) - now + + redis.call('pexpire', executing_key, ttl) + redis.call('pexpire', running_key, ttl) + redis.call('pexpire', settings_key, ttl) + end + +end diff --git a/src/redis/register.lua b/src/redis/register.lua index 8177e98..4c144c5 100644 --- a/src/redis/register.lua +++ b/src/redis/register.lua @@ -12,12 +12,14 @@ local settings = redis.call('hmget', settings_key, 'maxConcurrent', 'reservoir', 'nextRequest', - 'minTime' + 'minTime', + 'groupTimeout' ) local maxConcurrent = tonumber(settings[1]) local reservoir = tonumber(settings[2]) local nextRequest = tonumber(settings[3]) local minTime = tonumber(settings[4]) +local groupTimeout = tonumber(settings[5]) if conditions_check(weight, maxConcurrent, running, reservoir) then @@ -28,19 +30,22 @@ if conditions_check(weight, maxConcurrent, running, reservoir) then redis.call('hincrby', settings_key, 'running', weight) local wait = math.max(nextRequest - now, 0) + local newNextRequest = now + wait + minTime if reservoir == nil then redis.call('hset', settings_key, - 'nextRequest', now + wait + minTime + 'nextRequest', newNextRequest ) else reservoir = reservoir - weight redis.call('hmset', settings_key, 'reservoir', reservoir, - 'nextRequest', now + wait + minTime + 'nextRequest', newNextRequest ) end + refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout) + return {true, wait, reservoir} else diff --git a/src/redis/submit.lua b/src/redis/submit.lua index 6e27dbf..1dcc1a0 100644 --- a/src/redis/submit.lua +++ b/src/redis/submit.lua @@ -15,7 +15,8 @@ local settings = redis.call('hmget', settings_key, 'strategy', 'unblockTime', 'penalty', - 'minTime' + 'minTime', + 'groupTimeout' ) local maxConcurrent = tonumber(settings[1]) local highWater = tonumber(settings[2]) @@ -25,6 +26,7 @@ local strategy = tonumber(settings[5]) local unblockTime = tonumber(settings[6]) local penalty = tonumber(settings[7]) local minTime = tonumber(settings[8]) +local groupTimeout = tonumber(settings[9]) if maxConcurrent ~= nil and weight > maxConcurrent then return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent) @@ -49,10 +51,14 @@ if blocked then end end + local newNextRequest = unblockTime + minTime + redis.call('hmset', settings_key, 'unblockTime', now + computedPenalty, - 'nextRequest', unblockTime + minTime + 'nextRequest', newNextRequest ) + + refresh_expiration(executing_key, running_key, settings_key, now, newNextRequest, groupTimeout) end return {reachedHWM, blocked, strategy} diff --git a/src/redis/update_settings.lua b/src/redis/update_settings.lua index 3f26c7f..ae10080 100644 --- a/src/redis/update_settings.lua +++ b/src/redis/update_settings.lua @@ -1,4 +1,6 @@ local settings_key = KEYS[1] +local running_key = KEYS[2] +local executing_key = KEYS[3] local args = {'hmset', settings_key} @@ -8,4 +10,7 @@ end redis.call(unpack(args)) +local groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout')) +refresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout) + return {} diff --git a/src/redis/validate_keys.lua b/src/redis/validate_keys.lua new file mode 100644 index 0000000..ba7cd8a --- /dev/null +++ b/src/redis/validate_keys.lua @@ -0,0 +1,5 @@ +local settings_key = KEYS[1] + +if not (redis.call('exists', settings_key) == 1) then + return redis.error_reply('SETTINGS_KEY_NOT_FOUND') +end diff --git a/test/group.js b/test/group.js index 64bbfb8..83416e7 100644 --- a/test/group.js +++ b/test/group.js @@ -11,7 +11,7 @@ describe('Group', function () { } }) - it('Should make Groups', function (done) { + it('Should create limiters', function (done) { c = makeTest() var group = new Bottleneck.Group({ maxConcurrent: 1, minTime: 100 @@ -46,6 +46,24 @@ describe('Group', function () { }, null) }) + it('Should set up the limiter IDs', function () { + c = makeTest() + var group = new Bottleneck.Group({ + maxConcurrent: 1, minTime: 100 + }) + + c.mustEqual(group.key('A').id, 'group-key-A') + c.mustEqual(group.key('B').id, 'group-key-B') + c.mustEqual(group.key('XYZ').id, 'group-key-XYZ') + + var ids = group.keys().map(function (key) { + var limiter = group.key(key) + c.mustEqual(limiter._store._groupTimeout, group.timeout) + return limiter.id + }) + c.mustEqual(ids.sort(), ['group-key-A', 'group-key-B', 'group-key-XYZ']) + }) + it('Should pass new limiter to \'created\' event', function () { c = makeTest() var group = new Bottleneck.Group({ @@ -76,7 +94,7 @@ describe('Group', function () { return Promise.all(promises) .then(function () { - c.mustEqual() + c.mustEqual(keys, ids) }) }) @@ -164,17 +182,16 @@ describe('Group', function () { }) it('Should call autocleanup', function () { - c = makeTest() var KEY = 'test-key' var group = new Bottleneck.Group({ maxConcurrent: 1 }) group.updateSettings({ timeout: 50 }) + c = makeTest({ id: 'something', _groupTimeout: group.timeout }) return c.limiter.ready() .then(function () { group.instances[KEY] = c.limiter - return group.key(KEY).schedule(function () { return Promise.resolve() }) diff --git a/test/redis.js b/test/redis.js index 03aef06..0764eb4 100644 --- a/test/redis.js +++ b/test/redis.js @@ -53,7 +53,8 @@ if (process.env.DATASTORE === 'redis') { // Also check that the version gets set return new Promise(function (resolve, reject) { - limiter2._store.client.hget('b_settings', 'version', function (err, data) { + var settings_key = limiter2._store.scripts.update_settings.keys[0] + limiter2._store.client.hget(settings_key, 'version', function (err, data) { if (err != null) return reject(err) c.mustEqual(data, packagejson.version) return resolve() @@ -197,21 +198,149 @@ if (process.env.DATASTORE === 'redis') { }) }) - it('Should not allow Groups (will be implemented later)', function (done) { + it('Should use the limiter ID to build Redis keys', function () { c = makeTest() - var message = 'Groups do not currently support Clustering. This will be implemented in a future version. Please open an issue at https://github.com/SGrondin/bottleneck/issues if you would like this feature to be implemented.' + var randomId = c.limiter._randomIndex() + var limiter = new Bottleneck({ id: randomId, datastore: 'redis', clearDatastore: true }) - try { - var group = new Bottleneck.Group({ - datastore: 'redis', - clearDatastore: true + return limiter.ready() + .then(function () { + var settings_key = limiter._store.scripts.update_settings.keys[0] + assert(settings_key.indexOf(randomId) > 0) + + return new Promise(function (resolve, reject) { + limiter._store.client.del(settings_key, function (err, data) { + if (err != null) return reject(err) + return resolve(data) + }) + }) + + }) + .then(function (deleted) { + c.mustEqual(deleted, 1) + limiter.disconnect(false) + }) + }) + + it('Should fail when Redis data is missing', function (done) { + c = makeTest() + var limiter = new Bottleneck({ datastore: 'redis', clearDatastore: true }) + + limiter.ready() + .then(function () { + return limiter.running() + }) + .then(function (running) { + c.mustEqual(running, 0) + var settings_key = limiter._store.scripts.update_settings.keys[0] + + return new Promise(function (resolve, reject) { + limiter._store.client.del(settings_key, function (err, data) { + if (err != null) return reject(err) + c.mustEqual(data, 1) // Should be 1, since 1 key should have been deleted + return resolve(data) + }) }) - done(new Error('Should not allow Groups with Clustering')) - } catch (e) { - if (e.message === message) { - done() - } + + }) + .then(function (deleted) { + c.mustEqual(deleted, 1) + return limiter.running() + }) + .catch(function (err) { + c.mustEqual(err.message, 'Bottleneck limiter (id: \'\') could not find the Redis key it needs to complete this action (key \'b__settings\'), was it deleted?') + limiter.disconnect(false) + done() + }) + }) + + it('Should support Groups and expire Redis keys', function () { + c = makeTest() + var group = new Bottleneck.Group({ + datastore: 'redis', + clearDatastore: true, + minTime: 50 + }, { timeout: 200 }) + var limiter1 + var limiter2 + var limiter3 + + var limiterKeys = function (limiter) { + return limiter._store.scripts.init.keys + } + var keysExist = function (keys) { + return new Promise(function (resolve, reject) { + return c.limiter._store.client.exists(...keys, function (err, data) { + if (err != null) { + return reject(err) + } + return resolve(data) + }) + }) + } + var t0 = Date.now() + var results = {} + var job = function (x) { + results[x] = Date.now() - t0 + return Promise.resolve() } + + return c.limiter.ready() + .then(function () { + limiter1 = group.key('one') + limiter2 = group.key('two') + limiter3 = group.key('three') + + return Promise.all([limiter1.ready(), limiter2.ready(), limiter3.ready()]) + }) + .then(function () { + return keysExist( + [].concat(limiterKeys(limiter1), limiterKeys(limiter2), limiterKeys(limiter3)) + ) + }) + .then(function (exist) { + c.mustEqual(exist, 3) + return Promise.all([ + limiter1.schedule(job, 'a'), + limiter1.schedule(job, 'b'), + limiter1.schedule(job, 'c'), + limiter2.schedule(job, 'd'), + limiter2.schedule(job, 'e'), + limiter3.schedule(job, 'f') + ]) + }) + .then(function () { + c.mustEqual(Object.keys(results).length, 6) + assert(results.a < results.b) + assert(results.b < results.c) + assert(results.b - results.a >= 40) + assert(results.c - results.b >= 40) + + assert(results.d < results.e) + assert(results.e - results.d >= 40) + + assert(Math.abs(results.a - results.d) <= 10) + assert(Math.abs(results.d - results.f) <= 10) + assert(Math.abs(results.b - results.e) <= 10) + + return c.wait(300) + }) + .then(function () { + return keysExist( + [].concat(limiterKeys(limiter1), limiterKeys(limiter2), limiterKeys(limiter3)) + ) + }) + .then(function (exist) { + c.mustEqual(exist, 0) + }) + .then(function () { + group.keys().forEach(function (key) { + group.key(key).disconnect(false) + }) + }) + + }) + }) }