-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.coffee
145 lines (118 loc) · 4.95 KB
/
worker.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
WritableStream = Npm.require('stream').Writable
class JobsWorker extends JobsWorker
@WORKER_INSTANCES: parseInt(process.env.WORKER_INSTANCES || '1')
@WORKER_INSTANCES: if _.isFinite(@WORKER_INSTANCES) then @WORKER_INSTANCES else 1
@STALLED_JOB_CHECK_INTERVAL: 60 * 1000 # ms
@PROMOTE_INTERVAL: 15 * 1000 # ms
@_jobQueueRunning: false
@initialize: (@options={}) ->
super
@options = _.defaults {}, @options,
workerInstances: @WORKER_INSTANCES
stalledJobCheckInterval: @STALLED_JOB_CHECK_INTERVAL
promoteInterval: @PROMOTE_INTERVAL
# To prevent logging of all calls while keeping logging of errors.
# TODO: Replace with a better solution which overrides an event handler method.
# See https://github.com/vsivsi/meteor-job-collection/pull/123
@collection._callListener.removeListener 'call', @collection._callListener._events.call[0]
writableStream = new WritableStream
objectMode: true
writableStream._write = (chunk, encoding, callback) =>
@_log chunk
callback null
@collection.setLogStream writableStream
@collection._ensureIndex
type: 1
status: 1
@collection._ensureIndex
priority: 1
retryUntil: 1
after: 1
@_log: (data) ->
{timestamp, userId, method, message} = data
Log.info "#{method}: #{message}"
@start: ->
# Worker is disabled.
return Log.info "Worker disabled" unless @options.workerInstances
# We randomly delay start so that not all instances are promoting
# at the same time, but dispersed over the whole interval.
Meteor.setTimeout =>
# Check for promoted jobs at this interval. Jobs scheduled in the
# future has to be made ready at regular intervals because time-based
# queries are not reactive. time < NOW, NOW does not change as times go
# on, once you make a query. More instances we have, less frequently
# each particular instance should check.
@collection.promote @options.workerInstances * @options.promoteInterval
@_startProcessingJobs()
,
Random.fraction() * @options.workerInstances * @options.promoteInterval
# Same deal with delaying and spreading the interval based on
# the number of worker instances that we have for job promotion.
Meteor.setTimeout =>
# We check for stalled jobs ourselves (and not use workTimeout)
# so that each job class can define a different timeout.
Meteor.setInterval =>
@collection.find(status: 'running').forEach (jobQueueItem, index, cursor) =>
try
jobClass = Job.types[jobQueueItem.type]
return if new Date().valueOf() < jobQueueItem.updated.valueOf() + jobClass.timeout
job = @_makeJob jobQueueItem
job.fail "No progress for more than #{jobClass.timeout / 1000} seconds."
catch error
Log.error "Error while canceling a stalled job #{jobQueueItem.type}/#{jobQueueItem._id}: #{error.stack or error}"
,
@options.workerInstances * @options.stalledJobCheckInterval
,
Random.fraction() * @options.workerInstances * @options.stalledJobCheckInterval
@_startProcessingJobs: ->
@collection.startJobServer()
Log.info "Worker enabled"
# The query and sort here is based on the query in jobCollection's getWork query. We want to have a query which is
# the same, just that we observe with it and when there is any change, we call getWork itself.
@collection.find(
status: 'ready'
runId: null
,
sort:
priority: 1
retryUntil: 1
after: 1
fields:
_id: 1
).observeChanges
added: (id, fields) =>
@_runJobQueue()
changed: (id, fields) =>
@_runJobQueue()
@_runJobQueue: ->
return if @_jobQueueRunning
@_jobQueueRunning = true
# We defer so that we can return quick so that observe keeps
# running. We run here in a loop until there is no more work
# when we go back to observe to wait for next ready job.
Meteor.defer =>
try
loop
jobs = @collection.getWork _.keys Job.types
break unless jobs?.length
for job in jobs
try
try
jobInstance = Job.fromQueueJob job
result = jobInstance.run()
catch error
if error instanceof Error
stackframes = Promise.await StackTrace.fromError error
stack = (stackframe.toString() for stackframe in stackframes)
job.fail EJSON.toJSONValue(value: error.message, stack: stack),
fatal: error instanceof Job.FatalJobError
else
job.fail EJSON.toJSONValue(value: "#{error}")
continue
job.done EJSON.toJSONValue result
catch error
Log.error "Error running a job queue: #{error.stack or error}"
finally
@_jobQueueRunning = false
@promote: ->
@collection._promote_jobs()