This repository has been archived by the owner on Nov 12, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatcher.js
82 lines (80 loc) · 2.53 KB
/
dispatcher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
const { EventEmitter } = require("./common");
class Dispatcher extends EventEmitter {
constructor() {
super();
this.workers = new Map();
this.sockets = new Map();
this.jobs = new Map();
this.connectedWorkers = [];
this.transports = [];
this.index = 0;
}
addTransport(transport) {
this.transports.push(transport);
transport.on("call", (...args) => this.call(...args));
transport.on("getPaths", (...args) => this.getPaths(...args));
transport.on("registerWorker", (...args) => this.registerWorker(...args));
transport.start();
}
call(socket, { path, args }, id) {
const worker = this.getWorker(path);
if (worker) {
this.sockets.set(id, socket);
this.send(worker, { instruction: "call", details: { path, args } }, id);
} else {
this.addJob(socket, { path, args }, id);
}
}
getPaths(socket, { path }, id) {
const paths = [...this.workers.keys()].filter(p => p.startsWith(path));
this.send(socket, { instruction: "paths", details: { paths } }, id);
}
registerWorker(worker, { paths }, id) {
for (const path of paths)
this.workers.set(path, [...(this.workers.get(path) || []), worker]);
worker.on("message", data => this.handleWorkerResponse(data));
for (const path of paths) {
const jobs = this.jobs.get(path) || [];
this.jobs.set(path, []);
for (const job of jobs) this.call(...job);
}
this.connectedWorkers.push(worker);
const listeners = this.listeners.workerConnected || [];
listeners.forEach(fn => fn.call(this, worker));
}
addJob(socket, { path, args }, id) {
this.jobs.set(path, [
...(this.jobs.get(path) || []),
[socket, { path, args }, id],
]);
}
handleWorkerResponse(data) {
const { instruction, details, id } = data;
const socket = this.sockets.get(id);
this.send(socket, { instruction, details }, id);
}
getWorker(path) {
const workers = this.workers.get(path);
if (!workers) return;
const { length } = workers;
const index = Math.floor(Math.random() * length);
return workers[index];
}
send(socket, data, id) {
socket.send({ ...data, id });
}
expectWorkers(n) {
return new Promise(resolve => {
if (this.connectedWorkers.length >= n) resolve();
const waitForN = () => {
const { length } = this.connectedWorkers;
if (length >= n) {
this.off("workerConnected", waitForN);
resolve();
}
};
this.on("workerConnected", waitForN);
});
}
}
module.exports.Dispatcher = Dispatcher;