From 1a1bd6240531755d4241059a442557d548eba6dd Mon Sep 17 00:00:00 2001 From: Simon Grondin Date: Sat, 1 Dec 2018 17:06:27 -0600 Subject: [PATCH] Fix key expiration issues with Groups+Clustering --- README.md | 2 +- bottleneck.d.ts | 5 +- bottleneck.d.ts.ejs | 5 +- es5.js | 424 +++++++++++++++++++++------------ lib/Group.js | 24 +- lib/LocalDatastore.js | 114 +++++---- lib/RedisDatastore.js | 34 ++- lib/Scripts.js | 7 + lib/lua.json | 1 + src/Group.coffee | 7 +- src/LocalDatastore.coffee | 5 + src/RedisDatastore.coffee | 5 +- src/Scripts.coffee | 4 + src/redis/group_delete_key.lua | 5 + test.ts | 5 +- test/cluster.js | 78 ++++++ test/group.js | 13 +- 17 files changed, 509 insertions(+), 229 deletions(-) create mode 100644 src/redis/group_delete_key.lua diff --git a/README.md b/README.md index 7bc9d59..cc5101c 100644 --- a/README.md +++ b/README.md @@ -521,7 +521,7 @@ __deleteKey()__ * `str`: The key for the limiter to delete. -Manually deletes the limiter at the specified key. This can be useful when the auto cleanup is turned off. +Manually deletes the limiter at the specified key. When using Clustering, the Redis data is immediately deleted and the other Groups in the Cluster will eventually delete their local key automatically, unless it is still being used. __keys()__ diff --git a/bottleneck.d.ts b/bottleneck.d.ts index 0eed481..e51b5b1 100644 --- a/bottleneck.d.ts +++ b/bottleneck.d.ts @@ -292,10 +292,11 @@ declare module "bottleneck" { updateSettings(options: Bottleneck.ConstructorOptions): void; /** - * Deletes the limiter for the given key + * Deletes the limiter for the given key. + * Returns true if a key was deleted. * @param str - The key */ - deleteKey(str: string): Promise; + deleteKey(str: string): Promise; /** * Disconnects the underlying redis clients, unless the Group was created with the `connection` option. diff --git a/bottleneck.d.ts.ejs b/bottleneck.d.ts.ejs index d31f231..fdca344 100644 --- a/bottleneck.d.ts.ejs +++ b/bottleneck.d.ts.ejs @@ -292,10 +292,11 @@ declare module "bottleneck" { updateSettings(options: Bottleneck.ConstructorOptions): void; /** - * Deletes the limiter for the given key + * Deletes the limiter for the given key. + * Returns true if a key was deleted. * @param str - The key */ - deleteKey(str: string): Promise; + deleteKey(str: string): Promise; /** * Disconnects the underlying redis clients, unless the Group was created with the `connection` option. diff --git a/es5.js b/es5.js index a1a521d..b666d69 100644 --- a/es5.js +++ b/es5.js @@ -1370,10 +1370,33 @@ }() }, { key: "__disconnect__", - value: function __disconnect__(flush) { - clearInterval(this.heartbeat); - return this.Promise.resolve(); - } + value: function () { + var _disconnect__ = _asyncToGenerator( + /*#__PURE__*/ + regeneratorRuntime.mark(function _callee2(flush) { + return regeneratorRuntime.wrap(function _callee2$(_context2) { + while (1) { + switch (_context2.prev = _context2.next) { + case 0: + _context2.next = 2; + return this.yieldLoop(); + + case 2: + clearInterval(this.heartbeat); + return _context2.abrupt("return", this.Promise.resolve()); + + case 4: + case "end": + return _context2.stop(); + } + } + }, _callee2, this); + })); + + return function __disconnect__(_x2) { + return _disconnect__.apply(this, arguments); + }; + }() }, { key: "yieldLoop", value: function yieldLoop() { @@ -1393,12 +1416,12 @@ value: function () { var _updateSettings__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee2(options) { - return regeneratorRuntime.wrap(function _callee2$(_context2) { + regeneratorRuntime.mark(function _callee3(options) { + return regeneratorRuntime.wrap(function _callee3$(_context3) { while (1) { - switch (_context2.prev = _context2.next) { + switch (_context3.prev = _context3.next) { case 0: - _context2.next = 2; + _context3.next = 2; return this.yieldLoop(); case 2: @@ -1406,17 +1429,17 @@ this.instance._drainAll(this.computeCapacity()); - return _context2.abrupt("return", true); + return _context3.abrupt("return", true); case 5: case "end": - return _context2.stop(); + return _context3.stop(); } } - }, _callee2, this); + }, _callee3, this); })); - return function __updateSettings__(_x2) { + return function __updateSettings__(_x3) { return _updateSettings__.apply(this, arguments); }; }() @@ -1425,23 +1448,23 @@ value: function () { var _running__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee3() { - return regeneratorRuntime.wrap(function _callee3$(_context3) { + regeneratorRuntime.mark(function _callee4() { + return regeneratorRuntime.wrap(function _callee4$(_context4) { while (1) { - switch (_context3.prev = _context3.next) { + switch (_context4.prev = _context4.next) { case 0: - _context3.next = 2; + _context4.next = 2; return this.yieldLoop(); case 2: - return _context3.abrupt("return", this._running); + return _context4.abrupt("return", this._running); case 3: case "end": - return _context3.stop(); + return _context4.stop(); } } - }, _callee3, this); + }, _callee4, this); })); return function __running__() { @@ -1453,23 +1476,23 @@ value: function () { var _done__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee4() { - return regeneratorRuntime.wrap(function _callee4$(_context4) { + regeneratorRuntime.mark(function _callee5() { + return regeneratorRuntime.wrap(function _callee5$(_context5) { while (1) { - switch (_context4.prev = _context4.next) { + switch (_context5.prev = _context5.next) { case 0: - _context4.next = 2; + _context5.next = 2; return this.yieldLoop(); case 2: - return _context4.abrupt("return", this._done); + return _context5.abrupt("return", this._done); case 3: case "end": - return _context4.stop(); + return _context5.stop(); } } - }, _callee4, this); + }, _callee5, this); })); return function __done__() { @@ -1481,29 +1504,57 @@ value: function () { var _groupCheck__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee5(time) { - return regeneratorRuntime.wrap(function _callee5$(_context5) { + regeneratorRuntime.mark(function _callee6(time) { + return regeneratorRuntime.wrap(function _callee6$(_context6) { while (1) { - switch (_context5.prev = _context5.next) { + switch (_context6.prev = _context6.next) { case 0: - _context5.next = 2; + _context6.next = 2; return this.yieldLoop(); case 2: - return _context5.abrupt("return", this._nextRequest + this.timeout < time); + return _context6.abrupt("return", this._nextRequest + this.timeout < time); case 3: case "end": - return _context5.stop(); + return _context6.stop(); } } - }, _callee5, this); + }, _callee6, this); })); - return function __groupCheck__(_x3) { + return function __groupCheck__(_x4) { return _groupCheck__.apply(this, arguments); }; }() + }, { + key: "__groupDeleteKey__", + value: function () { + var _groupDeleteKey__ = _asyncToGenerator( + /*#__PURE__*/ + regeneratorRuntime.mark(function _callee7() { + return regeneratorRuntime.wrap(function _callee7$(_context7) { + while (1) { + switch (_context7.prev = _context7.next) { + case 0: + _context7.next = 2; + return this.yieldLoop(); + + case 2: + return _context7.abrupt("return", 1); + + case 3: + case "end": + return _context7.stop(); + } + } + }, _callee7, this); + })); + + return function __groupDeleteKey__() { + return _groupDeleteKey__.apply(this, arguments); + }; + }() }, { key: "computeCapacity", value: function computeCapacity() { @@ -1534,13 +1585,13 @@ value: function () { var _incrementReservoir__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee6(incr) { + regeneratorRuntime.mark(function _callee8(incr) { var reservoir; - return regeneratorRuntime.wrap(function _callee6$(_context6) { + return regeneratorRuntime.wrap(function _callee8$(_context8) { while (1) { - switch (_context6.prev = _context6.next) { + switch (_context8.prev = _context8.next) { case 0: - _context6.next = 2; + _context8.next = 2; return this.yieldLoop(); case 2: @@ -1548,17 +1599,17 @@ this.instance._drainAll(this.computeCapacity()); - return _context6.abrupt("return", reservoir); + return _context8.abrupt("return", reservoir); case 5: case "end": - return _context6.stop(); + return _context8.stop(); } } - }, _callee6, this); + }, _callee8, this); })); - return function __incrementReservoir__(_x4) { + return function __incrementReservoir__(_x5) { return _incrementReservoir__.apply(this, arguments); }; }() @@ -1567,23 +1618,23 @@ value: function () { var _currentReservoir__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee7() { - return regeneratorRuntime.wrap(function _callee7$(_context7) { + regeneratorRuntime.mark(function _callee9() { + return regeneratorRuntime.wrap(function _callee9$(_context9) { while (1) { - switch (_context7.prev = _context7.next) { + switch (_context9.prev = _context9.next) { case 0: - _context7.next = 2; + _context9.next = 2; return this.yieldLoop(); case 2: - return _context7.abrupt("return", this.storeOptions.reservoir); + return _context9.abrupt("return", this.storeOptions.reservoir); case 3: case "end": - return _context7.stop(); + return _context9.stop(); } } - }, _callee7, this); + }, _callee9, this); })); return function __currentReservoir__() { @@ -1605,28 +1656,28 @@ value: function () { var _check__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee8(weight) { + regeneratorRuntime.mark(function _callee10(weight) { var now; - return regeneratorRuntime.wrap(function _callee8$(_context8) { + return regeneratorRuntime.wrap(function _callee10$(_context10) { while (1) { - switch (_context8.prev = _context8.next) { + switch (_context10.prev = _context10.next) { case 0: - _context8.next = 2; + _context10.next = 2; return this.yieldLoop(); case 2: now = Date.now(); - return _context8.abrupt("return", this.check(weight, now)); + return _context10.abrupt("return", this.check(weight, now)); case 4: case "end": - return _context8.stop(); + return _context10.stop(); } } - }, _callee8, this); + }, _callee10, this); })); - return function __check__(_x5) { + return function __check__(_x6) { return _check__.apply(this, arguments); }; }() @@ -1635,20 +1686,20 @@ value: function () { var _register__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee9(index, weight, expiration) { + regeneratorRuntime.mark(function _callee11(index, weight, expiration) { var now, wait; - return regeneratorRuntime.wrap(function _callee9$(_context9) { + return regeneratorRuntime.wrap(function _callee11$(_context11) { while (1) { - switch (_context9.prev = _context9.next) { + switch (_context11.prev = _context11.next) { case 0: - _context9.next = 2; + _context11.next = 2; return this.yieldLoop(); case 2: now = Date.now(); if (!this.conditionsCheck(weight)) { - _context9.next = 11; + _context11.next = 11; break; } @@ -1660,26 +1711,26 @@ wait = Math.max(this._nextRequest - now, 0); this._nextRequest = now + wait + this.storeOptions.minTime; - return _context9.abrupt("return", { + return _context11.abrupt("return", { success: true, wait: wait, reservoir: this.storeOptions.reservoir }); case 11: - return _context9.abrupt("return", { + return _context11.abrupt("return", { success: false }); case 12: case "end": - return _context9.stop(); + return _context11.stop(); } } - }, _callee9, this); + }, _callee11, this); })); - return function __register__(_x6, _x7, _x8) { + return function __register__(_x7, _x8, _x9) { return _register__.apply(this, arguments); }; }() @@ -1693,18 +1744,18 @@ value: function () { var _submit__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee10(queueLength, weight) { + regeneratorRuntime.mark(function _callee12(queueLength, weight) { var blocked, now, reachedHWM; - return regeneratorRuntime.wrap(function _callee10$(_context10) { + return regeneratorRuntime.wrap(function _callee12$(_context12) { while (1) { - switch (_context10.prev = _context10.next) { + switch (_context12.prev = _context12.next) { case 0: - _context10.next = 2; + _context12.next = 2; return this.yieldLoop(); case 2: if (!(this.storeOptions.maxConcurrent != null && weight > this.storeOptions.maxConcurrent)) { - _context10.next = 4; + _context12.next = 4; break; } @@ -1722,7 +1773,7 @@ this.instance._dropAllQueued(); } - return _context10.abrupt("return", { + return _context12.abrupt("return", { reachedHWM: reachedHWM, blocked: blocked, strategy: this.storeOptions.strategy @@ -1730,13 +1781,13 @@ case 9: case "end": - return _context10.stop(); + return _context12.stop(); } } - }, _callee10, this); + }, _callee12, this); })); - return function __submit__(_x9, _x10) { + return function __submit__(_x10, _x11) { return _submit__.apply(this, arguments); }; }() @@ -1745,12 +1796,12 @@ value: function () { var _free__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee11(index, weight) { - return regeneratorRuntime.wrap(function _callee11$(_context11) { + regeneratorRuntime.mark(function _callee13(index, weight) { + return regeneratorRuntime.wrap(function _callee13$(_context13) { while (1) { - switch (_context11.prev = _context11.next) { + switch (_context13.prev = _context13.next) { case 0: - _context11.next = 2; + _context13.next = 2; return this.yieldLoop(); case 2: @@ -1759,19 +1810,19 @@ this.instance._drainAll(this.computeCapacity()); - return _context11.abrupt("return", { + return _context13.abrupt("return", { running: this._running }); case 6: case "end": - return _context11.stop(); + return _context13.stop(); } } - }, _callee11, this); + }, _callee13, this); })); - return function __free__(_x11, _x12) { + return function __free__(_x12, _x13) { return _free__.apply(this, arguments); }; }() @@ -1790,6 +1841,7 @@ "free.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\nlocal index = ARGV[2]\n\nredis.call('zadd', executing_key, 0, index)\n\nreturn refresh_capacity(executing_key, running_key, settings_key, now, false)[2]\n", "get_time.lua": "redis.replicate_commands()\n\nlocal get_time = function ()\n local time = redis.call('time')\n\n return tonumber(time[1]..string.sub(time[2], 1, 3))\nend\n", "group_check.lua": "local settings_key = KEYS[1]\n\nlocal now = tonumber(ARGV[1])\n\nreturn not (redis.call('exists', settings_key) == 1)\n", + "group_delete_key.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nreturn redis.call('del', settings_key, running_key, executing_key)\n", "heartbeat.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\n\nrefresh_capacity(executing_key, running_key, settings_key, now, false)\n", "increment_reservoir.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\nlocal incr = tonumber(ARGV[2])\n\nredis.call('hincrby', settings_key, 'reservoir', incr)\n\nlocal reservoir = refresh_capacity(executing_key, running_key, settings_key, now, true)[3]\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn reservoir\n", "init.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\nlocal clear = tonumber(ARGV[2])\nlocal limiter_version = ARGV[3]\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n -- Create\n local args = {'hmset', settings_key}\n\n for i = 4, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\n redis.call('hmset', settings_key,\n 'nextRequest', now,\n 'lastReservoirRefresh', now,\n 'running', 0,\n 'done', 0,\n 'unblockTime', 0\n )\n\nelse\n -- Apply migrations\n local current_version = redis.call('hget', settings_key, 'version')\n if current_version ~= limiter_version then\n local version_digits = {}\n for k, v in string.gmatch(current_version, \"([^.]+)\") do\n table.insert(version_digits, tonumber(k))\n end\n\n -- 2.10.0\n if version_digits[2] < 10 then\n redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')\n redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')\n redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')\n redis.call('hsetnx', settings_key, 'done', 0)\n redis.call('hset', settings_key, 'version', '2.10.0')\n end\n\n -- 2.11.1\n if version_digits[2] < 11 and version_digits[3] < 1 then\n if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then\n redis.call('hmset', settings_key,\n 'lastReservoirRefresh', now,\n 'version', '2.11.1'\n )\n end\n end\n end\n\n refresh_capacity(executing_key, running_key, settings_key, now, false)\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n", @@ -1860,6 +1912,13 @@ libs: [], code: lua$2["group_check.lua"] }, + group_delete_key: { + keys: function keys(id) { + return ["b_".concat(id, "_settings"), "b_".concat(id, "_running"), "b_".concat(id, "_executing")]; + }, + libs: [], + code: lua$2["group_delete_key.lua"] + }, check: { keys: function keys(id) { return ["b_".concat(id, "_settings"), "b_".concat(id, "_running"), "b_".concat(id, "_executing")]; @@ -2472,10 +2531,12 @@ }); return _this2.connection.__scriptFn__(name).apply({}, arr); }).catch(function (e) { - if (e.message === "SETTINGS_KEY_NOT_FOUND") { + if (e.message === "SETTINGS_KEY_NOT_FOUND" && name !== "heartbeat") { return _this2.runScript("init", _this2.prepareInitSettings(false)).then(function () { return _this2.runScript(name, args); }); + } else if (name === "heartbeat") { + return _this2.Promise.resolve(); } else { return _this2.Promise.reject(e); } @@ -2604,6 +2665,34 @@ return _groupCheck__.apply(this, arguments); }; }() + }, { + key: "__groupDeleteKey__", + value: function () { + var _groupDeleteKey__ = _asyncToGenerator( + /*#__PURE__*/ + regeneratorRuntime.mark(function _callee5() { + return regeneratorRuntime.wrap(function _callee5$(_context5) { + while (1) { + switch (_context5.prev = _context5.next) { + case 0: + _context5.next = 2; + return this.runScript("group_delete_key", []); + + case 2: + return _context5.abrupt("return", _context5.sent); + + case 3: + case "end": + return _context5.stop(); + } + } + }, _callee5, this); + })); + + return function __groupDeleteKey__() { + return _groupDeleteKey__.apply(this, arguments); + }; + }() }, { key: "__incrementReservoir__", value: function __incrementReservoir__(incr) { @@ -2619,25 +2708,25 @@ value: function () { var _check__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee5(weight) { - return regeneratorRuntime.wrap(function _callee5$(_context5) { + regeneratorRuntime.mark(function _callee6(weight) { + return regeneratorRuntime.wrap(function _callee6$(_context6) { while (1) { - switch (_context5.prev = _context5.next) { + switch (_context6.prev = _context6.next) { case 0: - _context5.t0 = this; - _context5.next = 3; + _context6.t0 = this; + _context6.next = 3; return this.runScript("check", this.prepareArray([weight])); case 3: - _context5.t1 = _context5.sent; - return _context5.abrupt("return", _context5.t0.convertBool.call(_context5.t0, _context5.t1)); + _context6.t1 = _context6.sent; + return _context6.abrupt("return", _context6.t0.convertBool.call(_context6.t0, _context6.t1)); case 5: case "end": - return _context5.stop(); + return _context6.stop(); } } - }, _callee5, this); + }, _callee6, this); })); return function __check__(_x5) { @@ -2649,23 +2738,23 @@ value: function () { var _register__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee6(index, weight, expiration) { + regeneratorRuntime.mark(function _callee7(index, weight, expiration) { var reservoir, success, wait, _ref3, _ref4; - return regeneratorRuntime.wrap(function _callee6$(_context6) { + return regeneratorRuntime.wrap(function _callee7$(_context7) { while (1) { - switch (_context6.prev = _context6.next) { + switch (_context7.prev = _context7.next) { case 0: - _context6.next = 2; + _context7.next = 2; return this.runScript("register", this.prepareArray([index, weight, expiration])); case 2: - _ref3 = _context6.sent; + _ref3 = _context7.sent; _ref4 = _slicedToArray(_ref3, 3); success = _ref4[0]; wait = _ref4[1]; reservoir = _ref4[2]; - return _context6.abrupt("return", { + return _context7.abrupt("return", { success: this.convertBool(success), wait: wait, reservoir: reservoir @@ -2673,10 +2762,10 @@ case 8: case "end": - return _context6.stop(); + return _context7.stop(); } } - }, _callee6, this); + }, _callee7, this); })); return function __register__(_x6, _x7, _x8) { @@ -2688,36 +2777,36 @@ value: function () { var _submit__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee7(queueLength, weight) { + regeneratorRuntime.mark(function _callee8(queueLength, weight) { var blocked, e, maxConcurrent, overweight, reachedHWM, strategy, _ref5, _ref6, _e$message$split, _e$message$split2; - return regeneratorRuntime.wrap(function _callee7$(_context7) { + return regeneratorRuntime.wrap(function _callee8$(_context8) { while (1) { - switch (_context7.prev = _context7.next) { + switch (_context8.prev = _context8.next) { case 0: - _context7.prev = 0; - _context7.next = 3; + _context8.prev = 0; + _context8.next = 3; return this.runScript("submit", this.prepareArray([queueLength, weight])); case 3: - _ref5 = _context7.sent; + _ref5 = _context8.sent; _ref6 = _slicedToArray(_ref5, 3); reachedHWM = _ref6[0]; blocked = _ref6[1]; strategy = _ref6[2]; - return _context7.abrupt("return", { + return _context8.abrupt("return", { reachedHWM: this.convertBool(reachedHWM), blocked: this.convertBool(blocked), strategy: strategy }); case 11: - _context7.prev = 11; - _context7.t0 = _context7["catch"](0); - e = _context7.t0; + _context8.prev = 11; + _context8.t0 = _context8["catch"](0); + e = _context8.t0; if (!(e.message.indexOf("OVERWEIGHT") === 0)) { - _context7.next = 23; + _context8.next = 23; break; } @@ -2733,10 +2822,10 @@ case 24: case "end": - return _context7.stop(); + return _context8.stop(); } } - }, _callee7, this, [[0, 11]]); + }, _callee8, this, [[0, 11]]); })); return function __submit__(_x9, _x10) { @@ -2748,27 +2837,27 @@ value: function () { var _free__ = _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee8(index, weight) { + regeneratorRuntime.mark(function _callee9(index, weight) { var running; - return regeneratorRuntime.wrap(function _callee8$(_context8) { + return regeneratorRuntime.wrap(function _callee9$(_context9) { while (1) { - switch (_context8.prev = _context8.next) { + switch (_context9.prev = _context9.next) { case 0: - _context8.next = 2; + _context9.next = 2; return this.runScript("free", this.prepareArray([index])); case 2: - running = _context8.sent; - return _context8.abrupt("return", { + running = _context9.sent; + return _context9.abrupt("return", { running: running }); case 4: case "end": - return _context8.stop(); + return _context9.stop(); } } - }, _callee8, this); + }, _callee9, this); })); return function __free__(_x11, _x12) { @@ -3154,13 +3243,48 @@ } }, { key: "deleteKey", - value: function deleteKey() { - var key = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : ""; - var instance; - instance = this.instances[key]; - delete this.instances[key]; - return instance != null ? instance.disconnect() : void 0; - } + value: function () { + var _deleteKey = _asyncToGenerator( + /*#__PURE__*/ + regeneratorRuntime.mark(function _callee() { + var key, + instance, + _args = arguments; + return regeneratorRuntime.wrap(function _callee$(_context) { + while (1) { + switch (_context.prev = _context.next) { + case 0: + key = _args.length > 0 && _args[0] !== undefined ? _args[0] : ""; + instance = this.instances[key]; + + if (!(instance != null)) { + _context.next = 8; + break; + } + + delete this.instances[key]; + _context.next = 6; + return instance._store.__groupDeleteKey__(); + + case 6: + _context.next = 8; + return instance.disconnect(); + + case 8: + return _context.abrupt("return", instance != null); + + case 9: + case "end": + return _context.stop(); + } + } + }, _callee, this); + })); + + return function deleteKey() { + return _deleteKey.apply(this, arguments); + }; + }() }, { key: "limiters", value: function limiters() { @@ -3194,65 +3318,65 @@ /*#__PURE__*/ _asyncToGenerator( /*#__PURE__*/ - regeneratorRuntime.mark(function _callee() { + regeneratorRuntime.mark(function _callee2() { var e, k, ref, results, time, v; - return regeneratorRuntime.wrap(function _callee$(_context) { + return regeneratorRuntime.wrap(function _callee2$(_context2) { while (1) { - switch (_context.prev = _context.next) { + switch (_context2.prev = _context2.next) { case 0: time = Date.now(); ref = _this2.instances; results = []; - _context.t0 = regeneratorRuntime.keys(ref); + _context2.t0 = regeneratorRuntime.keys(ref); case 4: - if ((_context.t1 = _context.t0()).done) { - _context.next = 23; + if ((_context2.t1 = _context2.t0()).done) { + _context2.next = 23; break; } - k = _context.t1.value; + k = _context2.t1.value; v = ref[k]; - _context.prev = 7; - _context.next = 10; + _context2.prev = 7; + _context2.next = 10; return v._store.__groupCheck__(time); case 10: - if (!_context.sent) { - _context.next = 14; + if (!_context2.sent) { + _context2.next = 14; break; } results.push(_this2.deleteKey(k)); - _context.next = 15; + _context2.next = 15; break; case 14: results.push(void 0); case 15: - _context.next = 21; + _context2.next = 21; break; case 17: - _context.prev = 17; - _context.t2 = _context["catch"](7); - e = _context.t2; + _context2.prev = 17; + _context2.t2 = _context2["catch"](7); + e = _context2.t2; results.push(v.Events.trigger("error", [e])); case 21: - _context.next = 4; + _context2.next = 4; break; case 23: - return _context.abrupt("return", results); + return _context2.abrupt("return", results); case 24: case "end": - return _context.stop(); + return _context2.stop(); } } - }, _callee, this, [[7, 17]]); + }, _callee2, this, [[7, 17]]); })), this.timeout / 2)).unref === "function" ? base.unref() : void 0; } }, { diff --git a/lib/Group.js b/lib/Group.js index 3ea8213..617083f 100644 --- a/lib/Group.js +++ b/lib/Group.js @@ -53,10 +53,20 @@ Group = function () { } deleteKey(key = "") { - var instance; - instance = this.instances[key]; - delete this.instances[key]; - return instance != null ? instance.disconnect() : void 0; + var _this = this; + + return _asyncToGenerator(function* () { + var instance; + instance = _this.instances[key]; + + if (instance != null) { + delete _this.instances[key]; + yield instance._store.__groupDeleteKey__(); + yield instance.disconnect(); + } + + return instance != null; + })(); } limiters() { @@ -80,7 +90,7 @@ Group = function () { } _startAutoCleanup() { - var _this = this; + var _this2 = this; var base; clearInterval(this.interval); @@ -89,7 +99,7 @@ Group = function () { _asyncToGenerator(function* () { var e, k, ref, results, time, v; time = Date.now(); - ref = _this.instances; + ref = _this2.instances; results = []; for (k in ref) { @@ -97,7 +107,7 @@ Group = function () { try { if (yield v._store.__groupCheck__(time)) { - results.push(_this.deleteKey(k)); + results.push(_this2.deleteKey(k)); } else { results.push(void 0); } diff --git a/lib/LocalDatastore.js b/lib/LocalDatastore.js index 59d4765..0873000 100644 --- a/lib/LocalDatastore.js +++ b/lib/LocalDatastore.js @@ -45,8 +45,13 @@ LocalDatastore = class LocalDatastore { } __disconnect__(flush) { - clearInterval(this.heartbeat); - return this.Promise.resolve(); + var _this2 = this; + + return _asyncToGenerator(function* () { + yield _this2.yieldLoop(); + clearInterval(_this2.heartbeat); + return _this2.Promise.resolve(); + })(); } yieldLoop(t = 0) { @@ -61,42 +66,51 @@ LocalDatastore = class LocalDatastore { } __updateSettings__(options) { - var _this2 = this; + var _this3 = this; return _asyncToGenerator(function* () { - yield _this2.yieldLoop(); - parser.overwrite(options, options, _this2.storeOptions); + yield _this3.yieldLoop(); + parser.overwrite(options, options, _this3.storeOptions); - _this2.instance._drainAll(_this2.computeCapacity()); + _this3.instance._drainAll(_this3.computeCapacity()); return true; })(); } __running__() { - var _this3 = this; + var _this4 = this; return _asyncToGenerator(function* () { - yield _this3.yieldLoop(); - return _this3._running; + yield _this4.yieldLoop(); + return _this4._running; })(); } __done__() { - var _this4 = this; + var _this5 = this; return _asyncToGenerator(function* () { - yield _this4.yieldLoop(); - return _this4._done; + yield _this5.yieldLoop(); + return _this5._done; })(); } __groupCheck__(time) { - var _this5 = this; + var _this6 = this; return _asyncToGenerator(function* () { - yield _this5.yieldLoop(); - return _this5._nextRequest + _this5.timeout < time; + yield _this6.yieldLoop(); + return _this6._nextRequest + _this6.timeout < time; + })(); + } + + __groupDeleteKey__() { + var _this7 = this; + + return _asyncToGenerator(function* () { + yield _this7.yieldLoop(); + return 1; })(); } @@ -124,25 +138,25 @@ LocalDatastore = class LocalDatastore { } __incrementReservoir__(incr) { - var _this6 = this; + var _this8 = this; return _asyncToGenerator(function* () { var reservoir; - yield _this6.yieldLoop(); - reservoir = _this6.storeOptions.reservoir += incr; + yield _this8.yieldLoop(); + reservoir = _this8.storeOptions.reservoir += incr; - _this6.instance._drainAll(_this6.computeCapacity()); + _this8.instance._drainAll(_this8.computeCapacity()); return reservoir; })(); } __currentReservoir__() { - var _this7 = this; + var _this9 = this; return _asyncToGenerator(function* () { - yield _this7.yieldLoop(); - return _this7.storeOptions.reservoir; + yield _this9.yieldLoop(); + return _this9.storeOptions.reservoir; })(); } @@ -155,37 +169,37 @@ LocalDatastore = class LocalDatastore { } __check__(weight) { - var _this8 = this; + var _this10 = this; return _asyncToGenerator(function* () { var now; - yield _this8.yieldLoop(); + yield _this10.yieldLoop(); now = Date.now(); - return _this8.check(weight, now); + return _this10.check(weight, now); })(); } __register__(index, weight, expiration) { - var _this9 = this; + var _this11 = this; return _asyncToGenerator(function* () { var now, wait; - yield _this9.yieldLoop(); + yield _this11.yieldLoop(); now = Date.now(); - if (_this9.conditionsCheck(weight)) { - _this9._running += weight; + if (_this11.conditionsCheck(weight)) { + _this11._running += weight; - if (_this9.storeOptions.reservoir != null) { - _this9.storeOptions.reservoir -= weight; + if (_this11.storeOptions.reservoir != null) { + _this11.storeOptions.reservoir -= weight; } - wait = Math.max(_this9._nextRequest - now, 0); - _this9._nextRequest = now + wait + _this9.storeOptions.minTime; + wait = Math.max(_this11._nextRequest - now, 0); + _this11._nextRequest = now + wait + _this11.storeOptions.minTime; return { success: true, wait, - reservoir: _this9.storeOptions.reservoir + reservoir: _this11.storeOptions.reservoir }; } else { return { @@ -200,47 +214,47 @@ LocalDatastore = class LocalDatastore { } __submit__(queueLength, weight) { - var _this10 = this; + var _this12 = this; return _asyncToGenerator(function* () { var blocked, now, reachedHWM; - yield _this10.yieldLoop(); + yield _this12.yieldLoop(); - if (_this10.storeOptions.maxConcurrent != null && weight > _this10.storeOptions.maxConcurrent) { - throw new BottleneckError(`Impossible to add a job having a weight of ${weight} to a limiter having a maxConcurrent setting of ${_this10.storeOptions.maxConcurrent}`); + if (_this12.storeOptions.maxConcurrent != null && weight > _this12.storeOptions.maxConcurrent) { + throw new BottleneckError(`Impossible to add a job having a weight of ${weight} to a limiter having a maxConcurrent setting of ${_this12.storeOptions.maxConcurrent}`); } now = Date.now(); - reachedHWM = _this10.storeOptions.highWater != null && queueLength === _this10.storeOptions.highWater && !_this10.check(weight, now); - blocked = _this10.strategyIsBlock() && (reachedHWM || _this10.isBlocked(now)); + reachedHWM = _this12.storeOptions.highWater != null && queueLength === _this12.storeOptions.highWater && !_this12.check(weight, now); + blocked = _this12.strategyIsBlock() && (reachedHWM || _this12.isBlocked(now)); if (blocked) { - _this10._unblockTime = now + _this10.computePenalty(); - _this10._nextRequest = _this10._unblockTime + _this10.storeOptions.minTime; + _this12._unblockTime = now + _this12.computePenalty(); + _this12._nextRequest = _this12._unblockTime + _this12.storeOptions.minTime; - _this10.instance._dropAllQueued(); + _this12.instance._dropAllQueued(); } return { reachedHWM, blocked, - strategy: _this10.storeOptions.strategy + strategy: _this12.storeOptions.strategy }; })(); } __free__(index, weight) { - var _this11 = this; + var _this13 = this; return _asyncToGenerator(function* () { - yield _this11.yieldLoop(); - _this11._running -= weight; - _this11._done += weight; + yield _this13.yieldLoop(); + _this13._running -= weight; + _this13._done += weight; - _this11.instance._drainAll(_this11.computeCapacity()); + _this13.instance._drainAll(_this13.computeCapacity()); return { - running: _this11._running + running: _this13._running }; })(); } diff --git a/lib/RedisDatastore.js b/lib/RedisDatastore.js index 7fbbde2..200947d 100644 --- a/lib/RedisDatastore.js +++ b/lib/RedisDatastore.js @@ -123,10 +123,12 @@ RedisDatastore = class RedisDatastore { }); return _this2.connection.__scriptFn__(name).apply({}, arr); }).catch(e => { - if (e.message === "SETTINGS_KEY_NOT_FOUND") { + if (e.message === "SETTINGS_KEY_NOT_FOUND" && name !== "heartbeat") { return _this2.runScript("init", _this2.prepareInitSettings(false)).then(() => { return _this2.runScript(name, args); }); + } else if (name === "heartbeat") { + return _this2.Promise.resolve(); } else { return _this2.Promise.reject(e); } @@ -198,6 +200,14 @@ RedisDatastore = class RedisDatastore { })(); } + __groupDeleteKey__() { + var _this5 = this; + + return _asyncToGenerator(function* () { + return yield _this5.runScript("group_delete_key", []); + })(); + } + __incrementReservoir__(incr) { return this.runScript("increment_reservoir", [incr]); } @@ -207,20 +217,20 @@ RedisDatastore = class RedisDatastore { } __check__(weight) { - var _this5 = this; + var _this6 = this; return _asyncToGenerator(function* () { - return _this5.convertBool((yield _this5.runScript("check", _this5.prepareArray([weight])))); + return _this6.convertBool((yield _this6.runScript("check", _this6.prepareArray([weight])))); })(); } __register__(index, weight, expiration) { - var _this6 = this; + var _this7 = this; return _asyncToGenerator(function* () { var reservoir, success, wait; - var _ref3 = yield _this6.runScript("register", _this6.prepareArray([index, weight, expiration])); + var _ref3 = yield _this7.runScript("register", _this7.prepareArray([index, weight, expiration])); var _ref4 = _slicedToArray(_ref3, 3); @@ -228,7 +238,7 @@ RedisDatastore = class RedisDatastore { wait = _ref4[1]; reservoir = _ref4[2]; return { - success: _this6.convertBool(success), + success: _this7.convertBool(success), wait, reservoir }; @@ -236,13 +246,13 @@ RedisDatastore = class RedisDatastore { } __submit__(queueLength, weight) { - var _this7 = this; + var _this8 = this; return _asyncToGenerator(function* () { var blocked, e, maxConcurrent, overweight, reachedHWM, strategy; try { - var _ref5 = yield _this7.runScript("submit", _this7.prepareArray([queueLength, weight])); + var _ref5 = yield _this8.runScript("submit", _this8.prepareArray([queueLength, weight])); var _ref6 = _slicedToArray(_ref5, 3); @@ -250,8 +260,8 @@ RedisDatastore = class RedisDatastore { blocked = _ref6[1]; strategy = _ref6[2]; return { - reachedHWM: _this7.convertBool(reachedHWM), - blocked: _this7.convertBool(blocked), + reachedHWM: _this8.convertBool(reachedHWM), + blocked: _this8.convertBool(blocked), strategy }; } catch (error) { @@ -274,11 +284,11 @@ RedisDatastore = class RedisDatastore { } __free__(index, weight) { - var _this8 = this; + var _this9 = this; return _asyncToGenerator(function* () { var running; - running = yield _this8.runScript("free", _this8.prepareArray([index])); + running = yield _this9.runScript("free", _this9.prepareArray([index])); return { running }; diff --git a/lib/Scripts.js b/lib/Scripts.js index 0cdc68d..d1d3b93 100644 --- a/lib/Scripts.js +++ b/lib/Scripts.js @@ -52,6 +52,13 @@ templates = { libs: [], code: lua["group_check.lua"] }, + group_delete_key: { + keys: function keys(id) { + return [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`]; + }, + libs: [], + code: lua["group_delete_key.lua"] + }, check: { keys: function keys(id) { return [`b_${id}_settings`, `b_${id}_running`, `b_${id}_executing`]; diff --git a/lib/lua.json b/lib/lua.json index cde21e2..e5e3952 100644 --- a/lib/lua.json +++ b/lib/lua.json @@ -6,6 +6,7 @@ "free.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\nlocal index = ARGV[2]\n\nredis.call('zadd', executing_key, 0, index)\n\nreturn refresh_capacity(executing_key, running_key, settings_key, now, false)[2]\n", "get_time.lua": "redis.replicate_commands()\n\nlocal get_time = function ()\n local time = redis.call('time')\n\n return tonumber(time[1]..string.sub(time[2], 1, 3))\nend\n", "group_check.lua": "local settings_key = KEYS[1]\n\nlocal now = tonumber(ARGV[1])\n\nreturn not (redis.call('exists', settings_key) == 1)\n", + "group_delete_key.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nreturn redis.call('del', settings_key, running_key, executing_key)\n", "heartbeat.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\n\nrefresh_capacity(executing_key, running_key, settings_key, now, false)\n", "increment_reservoir.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\nlocal incr = tonumber(ARGV[2])\n\nredis.call('hincrby', settings_key, 'reservoir', incr)\n\nlocal reservoir = refresh_capacity(executing_key, running_key, settings_key, now, true)[3]\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn reservoir\n", "init.lua": "local settings_key = KEYS[1]\nlocal running_key = KEYS[2]\nlocal executing_key = KEYS[3]\n\nlocal now = tonumber(ARGV[1])\nlocal clear = tonumber(ARGV[2])\nlocal limiter_version = ARGV[3]\n\nif clear == 1 then\n redis.call('del', settings_key, running_key, executing_key)\nend\n\nif redis.call('exists', settings_key) == 0 then\n -- Create\n local args = {'hmset', settings_key}\n\n for i = 4, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\n redis.call('hmset', settings_key,\n 'nextRequest', now,\n 'lastReservoirRefresh', now,\n 'running', 0,\n 'done', 0,\n 'unblockTime', 0\n )\n\nelse\n -- Apply migrations\n local current_version = redis.call('hget', settings_key, 'version')\n if current_version ~= limiter_version then\n local version_digits = {}\n for k, v in string.gmatch(current_version, \"([^.]+)\") do\n table.insert(version_digits, tonumber(k))\n end\n\n -- 2.10.0\n if version_digits[2] < 10 then\n redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')\n redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')\n redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')\n redis.call('hsetnx', settings_key, 'done', 0)\n redis.call('hset', settings_key, 'version', '2.10.0')\n end\n\n -- 2.11.1\n if version_digits[2] < 11 and version_digits[3] < 1 then\n if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then\n redis.call('hmset', settings_key,\n 'lastReservoirRefresh', now,\n 'version', '2.11.1'\n )\n end\n end\n end\n\n refresh_capacity(executing_key, running_key, settings_key, now, false)\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(executing_key, running_key, settings_key, 0, 0, groupTimeout)\n\nreturn {}\n", diff --git a/src/Group.coffee b/src/Group.coffee index cfda33e..ae65e60 100644 --- a/src/Group.coffee +++ b/src/Group.coffee @@ -34,8 +34,11 @@ class Group deleteKey: (key="") => instance = @instances[key] - delete @instances[key] - instance?.disconnect() + if instance? + delete @instances[key] + await instance._store.__groupDeleteKey__() + await instance.disconnect() + instance? limiters: -> { key: k, limiter: v } for k, v of @instances diff --git a/src/LocalDatastore.coffee b/src/LocalDatastore.coffee index aa89130..a987585 100644 --- a/src/LocalDatastore.coffee +++ b/src/LocalDatastore.coffee @@ -24,6 +24,7 @@ class LocalDatastore @instance.Events.trigger "message", [message.toString()] __disconnect__: (flush) -> + await @yieldLoop() clearInterval @heartbeat @Promise.resolve() @@ -49,6 +50,10 @@ class LocalDatastore await @yieldLoop() (@_nextRequest + @timeout) < time + __groupDeleteKey__: -> + await @yieldLoop() + 1 + computeCapacity: -> { maxConcurrent, reservoir } = @storeOptions if maxConcurrent? and reservoir? then Math.min((maxConcurrent - @_running), reservoir) diff --git a/src/RedisDatastore.coffee b/src/RedisDatastore.coffee index cfd73b3..7c2ee76 100644 --- a/src/RedisDatastore.coffee +++ b/src/RedisDatastore.coffee @@ -57,9 +57,10 @@ class RedisDatastore return resolve replies @connection.__scriptFn__(name).apply {}, arr .catch (e) => - if e.message == "SETTINGS_KEY_NOT_FOUND" + if e.message == "SETTINGS_KEY_NOT_FOUND" and name != "heartbeat" @runScript("init", @prepareInitSettings(false)) .then => @runScript(name, args) + else if name == "heartbeat" then @Promise.resolve() else @Promise.reject e prepareArray: (arr) -> (if x? then x.toString() else "") for x in arr @@ -90,6 +91,8 @@ class RedisDatastore __groupCheck__: -> @convertBool await @runScript "group_check", [] + __groupDeleteKey__: -> await @runScript "group_delete_key", [] + __incrementReservoir__: (incr) -> @runScript "increment_reservoir", [incr] __currentReservoir__: -> @runScript "current_reservoir", [] diff --git a/src/Scripts.coffee b/src/Scripts.coffee index 077f1b0..65a41b5 100644 --- a/src/Scripts.coffee +++ b/src/Scripts.coffee @@ -32,6 +32,10 @@ templates = keys: (id) -> ["b_#{id}_settings"] libs: [] code: lua["group_check.lua"] + group_delete_key: + keys: (id) -> ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] + libs: [] + code: lua["group_delete_key.lua"] check: keys: (id) -> ["b_#{id}_settings", "b_#{id}_running", "b_#{id}_executing"] libs: ["validate_keys", "refresh_capacity", "conditions_check"] diff --git a/src/redis/group_delete_key.lua b/src/redis/group_delete_key.lua new file mode 100644 index 0000000..e1bb73e --- /dev/null +++ b/src/redis/group_delete_key.lua @@ -0,0 +1,5 @@ +local settings_key = KEYS[1] +local running_key = KEYS[2] +local executing_key = KEYS[3] + +return redis.call('del', settings_key, running_key, executing_key) diff --git a/test.ts b/test.ts index 9826f61..c440ae3 100644 --- a/test.ts +++ b/test.ts @@ -141,7 +141,10 @@ console.log(limiter.jobs()); console.log(limiter.jobs(Bottleneck.Status.RUNNING)); -group.deleteKey("pizza"); +group.deleteKey("pizza") +.then(function (deleted: boolean) { + console.log(deleted) +}); group.updateSettings({ timeout: 5, maxConcurrent: null, reservoir: null }); let keys: string[] = group.keys(); diff --git a/test/cluster.js b/test/cluster.js index b1f9a2e..f7b26ab 100644 --- a/test/cluster.js +++ b/test/cluster.js @@ -845,5 +845,83 @@ if (process.env.DATASTORE === 'redis' || process.env.DATASTORE === 'ioredis') { }) + it('Should not recreate a key when running heartbeat', function () { + c = makeTest() + var group = new Bottleneck.Group({ + datastore: process.env.DATASTORE, + clearDatastore: true, + maxConcurrent: 50, + minTime: 50, + timeout: 300, + heartbeatInterval: 5 + }) + var key = 'heartbeat' + + var limiter = group.key(key) + return c.pNoErrVal(limiter.schedule(c.promise, null, 1), 1) + .then(function () { + return limiter.done() + }) + .then(function (done) { + c.mustEqual(done, 1) + return c.wait(500) + }) + .then(function () { + return countKeys(limiter) + }) + .then(function (count) { + c.mustEqual(count, 0) + return group.disconnect(false) + }) + }) + + it('Should delete Redis key when manually deleting a group key', function () { + c = makeTest() + var group1 = new Bottleneck.Group({ + datastore: process.env.DATASTORE, + clearDatastore: true, + maxConcurrent: 50, + minTime: 50, + timeout: 300 + }) + var group2 = new Bottleneck.Group({ + datastore: process.env.DATASTORE, + clearDatastore: true, + maxConcurrent: 50, + minTime: 50, + timeout: 300 + }) + var key = 'deleted' + var limiter = group1.key(key) // only for countKeys() use + + return c.pNoErrVal(group1.key(key).schedule(c.promise, null, 1), 1) + .then(function () { + return c.pNoErrVal(group2.key(key).schedule(c.promise, null, 2), 2) + }) + .then(function () { + c.mustEqual(group1.keys().length, 1) + c.mustEqual(group2.keys().length, 1) + return group1.deleteKey(key) + }) + .then(function (deleted) { + c.mustEqual(deleted, true) + return countKeys(limiter) + }) + .then(function (count) { + c.mustEqual(count, 0) + c.mustEqual(group1.keys().length, 0) + c.mustEqual(group2.keys().length, 1) + return c.wait(200) + }) + .then(function () { + c.mustEqual(group1.keys().length, 0) + c.mustEqual(group2.keys().length, 0) + return Promise.all([ + group1.disconnect(false), + group2.disconnect(false) + ]) + }) + }) + }) } diff --git a/test/group.js b/test/group.js index 13cb290..f5356ab 100644 --- a/test/group.js +++ b/test/group.js @@ -193,7 +193,7 @@ describe('Group', function () { c.mustEqual(limiter2._store.storeOptions.minTime, 200) }) - it('Should support keys() and limiters()', function () { + it('Should support keys(), limiters(), deleteKey()', function () { c = makeTest() var group1 = new Bottleneck.Group({ maxConcurrent: 1 @@ -213,6 +213,17 @@ describe('Group', function () { c.mustEqual(limiter.key, keys[i]) assert(limiter.limiter instanceof Bottleneck) }) + + return group1.deleteKey(KEY_A) + .then(function (deleted) { + c.mustEqual(deleted, true) + c.mustEqual(group1.keys().length, 1) + return group1.deleteKey(KEY_A) + }) + .then(function (deleted) { + c.mustEqual(deleted, false) + c.mustEqual(group1.keys().length, 1) + }) }) it('Should call autocleanup', function () {