Skip to content

Commit

Permalink
Blacklist unresponsive limiters
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Grondin committed Jan 13, 2019
1 parent c7d2fe9 commit ca43f54
Show file tree
Hide file tree
Showing 14 changed files with 478 additions and 242 deletions.
348 changes: 210 additions & 138 deletions es5.js

Large diffs are not rendered by default.

25 changes: 15 additions & 10 deletions lib/Bottleneck.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Bottleneck = function () {
var args, index, next, options, queue;

if (this.queued() === 0) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}

queue = this._queues.getFirst();
Expand All @@ -281,7 +281,7 @@ Bottleneck = function () {
args = _next2.args;

if (capacity != null && options.weight > capacity) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}

this.Events.trigger("debug", `Draining ${options.id}`, {
Expand Down Expand Up @@ -314,19 +314,24 @@ Bottleneck = function () {
}

this._run(next, wait, index, 0);
}

return this.Promise.resolve(success);
return this.Promise.resolve(options.weight);
} else {
return this.Promise.resolve(null);
}
});
});
}

_drainAll(capacity) {
return this._drainOne(capacity).then(success => {
if (success) {
return this._drainAll();
_drainAll(capacity, total = 0) {
return this._drainOne(capacity).then(drained => {
var newCapacity;

if (drained != null) {
newCapacity = capacity != null ? capacity - drained : capacity;
return this._drainAll(newCapacity, total + drained);
} else {
return this.Promise.resolve(success);
return this.Promise.resolve(total);
}
}).catch(e => {
return this.Events.trigger("error", e);
Expand Down Expand Up @@ -381,7 +386,7 @@ Bottleneck = function () {
done = options.dropWaitingJobs ? (this._run = next => {
return this._drop(next, options.dropErrorMessage);
}, this._drainOne = () => {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}, this._registerLock.schedule(() => {
return this._submitLock.schedule(() => {
var k, ref, v;
Expand Down
98 changes: 61 additions & 37 deletions lib/RedisDatastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ RedisDatastore = class RedisDatastore {
this.clientId = this.instance._randomIndex();
parser.load(storeInstanceOptions, storeInstanceOptions, this);
this.clients = {};
this.capacityPriorityCounters = {};
this.sharedConnection = this.connection != null;

if (this.connection == null) {
Expand Down Expand Up @@ -81,35 +82,58 @@ RedisDatastore = class RedisDatastore {
var _this2 = this;

return _asyncToGenerator(function* () {
var capacity, data, pos, priorityClient, type;
pos = message.indexOf(":");
var _ref2 = [message.slice(0, pos), message.slice(pos + 1)];
type = _ref2[0];
data = _ref2[1];
var capacity, counter, data, drained, e, newCapacity, pos, priorityClient, rawCapacity, type;

if (type === "capacity") {
return yield _this2.instance._drainAll(data.length > 0 ? ~~data : void 0);
} else if (type === "capacity-priority") {
var _data$split = data.split(":");

var _data$split2 = _slicedToArray(_data$split, 2);

capacity = _data$split2[0];
priorityClient = _data$split2[1];

if (priorityClient === _this2.clientId) {
yield _this2.instance._drainAll(capacity.length > 0 ? ~~capacity : void 0);
return yield _this2.clients.client.publish(_this2.instance.channel(), "capacity:");
} else {
yield new _this2.Promise(function (resolve, reject) {
return setTimeout(resolve, 500);
});
return yield _this2.instance._drainAll(capacity.length > 0 ? ~~capacity : void 0);
try {
pos = message.indexOf(":");
var _ref2 = [message.slice(0, pos), message.slice(pos + 1)];
type = _ref2[0];
data = _ref2[1];

if (type === "capacity") {
return yield _this2.instance._drainAll(data.length > 0 ? ~~data : void 0);
} else if (type === "capacity-priority") {
var _data$split = data.split(":");

var _data$split2 = _slicedToArray(_data$split, 3);

rawCapacity = _data$split2[0];
priorityClient = _data$split2[1];
counter = _data$split2[2];
capacity = rawCapacity.length > 0 ? ~~rawCapacity : void 0;

if (priorityClient === _this2.clientId) {
drained = yield _this2.instance._drainAll(capacity);
newCapacity = capacity != null ? capacity - (drained || 0) : "";
return yield _this2.clients.client.publish(_this2.instance.channel(), `capacity-priority:${newCapacity}::${counter}`);
} else if (priorityClient === "") {
clearTimeout(_this2.capacityPriorityCounters[counter]);
delete _this2.capacityPriorityCounters[counter];
return _this2.instance._drainAll(capacity);
} else {
return _this2.capacityPriorityCounters[counter] = setTimeout(
/*#__PURE__*/
_asyncToGenerator(function* () {
var e;

try {
delete _this2.capacityPriorityCounters[counter];
yield _this2.runScript("blacklist_client", [priorityClient]);
return yield _this2.instance._drainAll(capacity);
} catch (error) {
e = error;
return _this2.instance.Events.trigger("error", e);
}
}), 1000);
}
} else if (type === "message") {
return _this2.instance.Events.trigger("message", data);
} else if (type === "blocked") {
return yield _this2.instance._dropAllQueued();
}
} else if (type === "message") {
return _this2.instance.Events.trigger("message", data);
} else if (type === "blocked") {
return _this2.instance._dropAllQueued();
} catch (error) {
e = error;
return _this2.instance.Events.trigger("error", e);
}
})();
}
Expand Down Expand Up @@ -246,13 +270,13 @@ RedisDatastore = class RedisDatastore {
return _asyncToGenerator(function* () {
var reservoir, success, wait;

var _ref3 = yield _this7.runScript("register", _this7.prepareArray([index, weight, expiration]));
var _ref4 = yield _this7.runScript("register", _this7.prepareArray([index, weight, expiration]));

var _ref4 = _slicedToArray(_ref3, 3);
var _ref5 = _slicedToArray(_ref4, 3);

success = _ref4[0];
wait = _ref4[1];
reservoir = _ref4[2];
success = _ref5[0];
wait = _ref5[1];
reservoir = _ref5[2];
return {
success: _this7.convertBool(success),
wait,
Expand All @@ -268,13 +292,13 @@ RedisDatastore = class RedisDatastore {
var blocked, e, maxConcurrent, overweight, reachedHWM, strategy;

try {
var _ref5 = yield _this8.runScript("submit", _this8.prepareArray([queueLength, weight]));
var _ref6 = yield _this8.runScript("submit", _this8.prepareArray([queueLength, weight]));

var _ref6 = _slicedToArray(_ref5, 3);
var _ref7 = _slicedToArray(_ref6, 3);

reachedHWM = _ref6[0];
blocked = _ref6[1];
strategy = _ref6[2];
reachedHWM = _ref7[0];
blocked = _ref7[1];
strategy = _ref7[2];
return {
reachedHWM: _this8.convertBool(reachedHWM),
blocked: _this8.convertBool(blocked),
Expand Down
17 changes: 13 additions & 4 deletions lib/Scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ exports.allKeys = function (id) {
ZSET
client -> last job registered
*/
`b_${id}_client_last_registered`];
`b_${id}_client_last_registered`,
/*
ZSET
client -> last seen
*/
`b_${id}_client_last_seen`];
};

templates = {
Expand All @@ -57,9 +62,7 @@ templates = {
code: lua["init.lua"]
},
group_check: {
keys: function keys(id) {
return [`b_${id}_settings`];
},
keys: exports.allKeys,
headers: [],
refresh_expiration: false,
code: lua["group_check.lua"]
Expand All @@ -70,6 +73,12 @@ templates = {
refresh_expiration: false,
code: lua["register_client.lua"]
},
blacklist_client: {
keys: exports.allKeys,
headers: ["validate_keys"],
refresh_expiration: false,
code: lua["blacklist_client.lua"]
},
heartbeat: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
Expand Down
7 changes: 4 additions & 3 deletions lib/lua.json

Large diffs are not rendered by default.

22 changes: 13 additions & 9 deletions light.js
Original file line number Diff line number Diff line change
Expand Up @@ -1054,12 +1054,12 @@
return this._registerLock.schedule(() => {
var args, index, next, options, queue;
if (this.queued() === 0) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}
queue = this._queues.getFirst();
({options, args} = next = queue.first());
if ((capacity != null) && options.weight > capacity) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}
this.Events.trigger("debug", `Draining ${options.id}`, {args, options});
index = this._randomIndex();
Expand All @@ -1076,18 +1076,22 @@
this.Events.trigger("depleted", empty);
}
this._run(next, wait, index, 0);
return this.Promise.resolve(options.weight);
} else {
return this.Promise.resolve(null);
}
return this.Promise.resolve(success);
});
});
}

_drainAll(capacity) {
return this._drainOne(capacity).then((success) => {
if (success) {
return this._drainAll();
_drainAll(capacity, total = 0) {
return this._drainOne(capacity).then((drained) => {
var newCapacity;
if (drained != null) {
newCapacity = capacity != null ? capacity - drained : capacity;
return this._drainAll(newCapacity, total + drained);
} else {
return this.Promise.resolve(success);
return this.Promise.resolve(total);
}
}).catch((e) => {
return this.Events.trigger("error", e);
Expand Down Expand Up @@ -1137,7 +1141,7 @@
done = options.dropWaitingJobs ? (this._run = (next) => {
return this._drop(next, options.dropErrorMessage);
}, this._drainOne = () => {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}, this._registerLock.schedule(() => {
return this._submitLock.schedule(() => {
var k, ref, v;
Expand Down
20 changes: 12 additions & 8 deletions src/Bottleneck.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ class Bottleneck

_drainOne: (capacity) =>
@_registerLock.schedule =>
if @queued() == 0 then return @Promise.resolve false
if @queued() == 0 then return @Promise.resolve null
queue = @_queues.getFirst()
{ options, args } = next = queue.first()
if capacity? and options.weight > capacity then return @Promise.resolve false
if capacity? and options.weight > capacity then return @Promise.resolve null
@Events.trigger "debug", "Draining #{options.id}", { args, options }
index = @_randomIndex()
@_store.__register__ index, options.weight, options.expiration
Expand All @@ -178,13 +178,17 @@ class Bottleneck
if empty then @Events.trigger "empty"
if reservoir == 0 then @Events.trigger "depleted", empty
@_run next, wait, index, 0
@Promise.resolve success
@Promise.resolve options.weight
else
@Promise.resolve null

_drainAll: (capacity) ->
_drainAll: (capacity, total=0) ->
@_drainOne(capacity)
.then (success) =>
if success then @_drainAll()
else @Promise.resolve success
.then (drained) =>
if drained?
newCapacity = if capacity? then capacity - drained else capacity
@_drainAll(newCapacity, total + drained)
else @Promise.resolve total
.catch (e) => @Events.trigger "error", e

_drop: (job, message="This job has been dropped by Bottleneck") ->
Expand All @@ -210,7 +214,7 @@ class Bottleneck
resolve()
done = if options.dropWaitingJobs
@_run = (next) => @_drop next, options.dropErrorMessage
@_drainOne = => @Promise.resolve false
@_drainOne = => @Promise.resolve null
@_registerLock.schedule => @_submitLock.schedule =>
for k, v of @_scheduled
if @jobStatus(v.job.options.id) == "RUNNING"
Expand Down
46 changes: 30 additions & 16 deletions src/RedisDatastore.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class RedisDatastore
@clientId = @instance._randomIndex()
parser.load storeInstanceOptions, storeInstanceOptions, @
@clients = {}
@capacityPriorityCounters = {}
@sharedConnection = @connection?

@connection ?= if @instance.datastore == "redis" then new RedisConnection { @clientOptions, @Promise, Events: @instance.Events }
Expand All @@ -33,22 +34,35 @@ class RedisDatastore
client.publish(@instance.channel(), "message:#{message.toString()}")

onMessage: (channel, message) ->
pos = message.indexOf(":")
[type, data] = [message.slice(0, pos), message.slice(pos+1)]
if type == "capacity"
await @instance._drainAll(if data.length > 0 then ~~data)
else if type == "capacity-priority"
[capacity, priorityClient] = data.split(":")
if priorityClient == @clientId
await @instance._drainAll(if capacity.length > 0 then ~~capacity)
await @clients.client.publish(@instance.channel(), "capacity:")
else
await (new @Promise (resolve, reject) -> setTimeout resolve, 500)
await @instance._drainAll(if capacity.length > 0 then ~~capacity)
else if type == "message"
@instance.Events.trigger "message", data
else if type == "blocked"
@instance._dropAllQueued()
try
pos = message.indexOf(":")
[type, data] = [message.slice(0, pos), message.slice(pos+1)]
if type == "capacity"
await @instance._drainAll(if data.length > 0 then ~~data)
else if type == "capacity-priority"
[rawCapacity, priorityClient, counter] = data.split(":")
capacity = if rawCapacity.length > 0 then ~~rawCapacity
if priorityClient == @clientId
drained = await @instance._drainAll(capacity)
newCapacity = if capacity? then capacity - (drained or 0) else ""
await @clients.client.publish(@instance.channel(), "capacity-priority:#{newCapacity}::#{counter}")
else if priorityClient == ""
clearTimeout @capacityPriorityCounters[counter]
delete @capacityPriorityCounters[counter]
@instance._drainAll(capacity)
else
@capacityPriorityCounters[counter] = setTimeout =>
try
delete @capacityPriorityCounters[counter]
await @runScript "blacklist_client", [priorityClient]
await @instance._drainAll(capacity)
catch e then @instance.Events.trigger "error", e
, 1000
else if type == "message"
@instance.Events.trigger "message", data
else if type == "blocked"
await @instance._dropAllQueued()
catch e then @instance.Events.trigger "error", e

__disconnect__: (flush) ->
clearInterval @heartbeat
Expand Down
Loading

0 comments on commit ca43f54

Please sign in to comment.