-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathTransportServer.js
69 lines (53 loc) · 1.97 KB
/
TransportServer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
const { QOS_LEVELS } = require('./constants');
const buildTopicRegExp = require('./buildTopicRegExp');
class MQTTTransportServer {
constructor({
mqttClient,
inTopic,
outTopic,
inQos = QOS_LEVELS.AT_MOST_ONCE,
outQos = QOS_LEVELS.AT_MOST_ONCE
}) {
if (!mqttClient) throw new Error('"mqttClient" required');
if (!inTopic) throw new Error('"inTopic" required');
this.mqttClient = mqttClient;
this.inTopic = inTopic;
this.outTopic = outTopic;
this.inQos = inQos;
this.outQos = outQos;
this.messageHandler = () => {};
}
async onData(callback) {
const inTopicRegExp = buildTopicRegExp(this.inTopic);
this.messageHandler = async (topic, messageBuffer, packet) => {
if (!inTopicRegExp.test(topic)) {
return;
}
const responseData = await callback(messageBuffer.toString());
if (!responseData) {
return;
}
let outTopic;
if (packet.properties && packet.properties.responseTopic) {
// Supported only by MQTT 5.0
outTopic = packet.properties.responseTopic;
} else if (typeof this.outTopic === 'function') {
outTopic = this.outTopic({ inTopic: topic });
} else {
outTopic = this.outTopic;
}
if (!outTopic) {
throw new Error('"outTopic" is not specified');
}
await this.mqttClient.publish(outTopic, responseData, { qos: this.outQos });
};
this.mqttClient.on('message', this.messageHandler);
await this.mqttClient.subscribe(this.inTopic, { qos: this.inQos });
}
async shutdown() {
await this.mqttClient.unsubscribe(this.inTopic);
this.mqttClient.off('message', this.messageHandler);
this.messageHandler = () => {};
}
}
module.exports = MQTTTransportServer;