-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclock.js
77 lines (67 loc) · 2.22 KB
/
clock.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const CronJob = require('cron').CronJob
const amqp = require('amqp-connection-manager')
// ENV
const AMQP_URL = process.env.CLOUDAMQP_URL || 'amqp://localhost';
if (!AMQP_URL) process.exit(1)
// GLOABLES
const WORKER_QUEUE = 'worker-queue' // To consume from worker process
const CLOCK_QUEUE = 'clock-queue' // To consume from clock process
const JOBS = [{ // Multiple are possible -> could be stored these jobs in a database
name: "Cron process 1",
message: { "taskName": "getNotes", "queue": "worker-queue" }, // message in json format
cronTime: "*/50 * * * *", // Every 50min
repeat: 1
}];
//RABBITMQ
// Create a new connection manager from AMQP
var connection = amqp.connect([AMQP_URL])
console.log('[AMQP] - Connecting...')
connection.on('connect', function() {
process.once('SIGINT', function() { // Close conn on exit
connection.close()
})
console.log('[AMQP] - Connected!')
return startCronProcess(JOBS)
})
connection.on('disconnect', function(params) {
return console.error('[AMQP] - Disconnected.', params.err.stack)
})
// FUNCTIONS
const startCronProcess = (jobs) => {
if (jobs && jobs.length) {
jobs.forEach(job => {
let j = new CronJob({
cronTime: job.cronTime ? job.cronTime : new Date(job.dateTime),
onTick: () => {
sendMessage(job.message)
if (!job.repeat) j.stop()
},
onComplete: () => {
console.log('Job completed! Removing now...')
},
timeZone: 'America/Argentina/Buenos_Aires',
start: true // Start now
})
})
}
}
const sendMessage = (data) => {
let message = data;
if (!message) { return }
let queue = message.queue || WORKER_QUEUE
let senderChannelWrapper = connection.createChannel({
json: true,
setup: function(channel) {
return channel.assertQueue(queue, {durable: true});
}
})
senderChannelWrapper.sendToQueue(queue, message, { contentType: 'application/json', persistent: true })
.then(function() {
console.log('[AMQP] - Message sent to queue =>', queue)
senderChannelWrapper.close()
})
.catch(err => {
console.error('[AMQP] - Message to queue => '+queue+ '<= was rejected! ', err.stack)
senderChannelWrapper.close()
})
}