@@ -73,14 +73,16 @@ data SchedConfig = SchedConfig
73
73
, sCleanup :: IO ()
74
74
}
75
75
76
+ type Task m = Async (StM (SchedT m ) () )
77
+
76
78
data SchedState m = SchedState
77
79
{ sFuncStatList :: FuncStatList
78
80
, sLocker :: L. Lock
79
81
, sGrabQueue :: GrabQueue
80
82
, sJobQueue :: JobQueue
81
83
, sProcessJob :: ProcessQueue
82
84
, sAlive :: TVar Bool
83
- , sSchedJobQ :: IOHashMap JobHandle (Async ( StM ( SchedT m ) () ) )
85
+ , sTaskList :: IOHashMap JobHandle (Task m )
84
86
}
85
87
86
88
type SchedT m = StateT (SchedState m ) (ReaderT SchedConfig m )
@@ -103,7 +105,7 @@ initSchedState = do
103
105
sGrabQueue <- newGrabQueue
104
106
sJobQueue <- newIOHashMap
105
107
sProcessJob <- newIOHashMap
106
- sSchedJobQ <- newIOHashMap
108
+ sTaskList <- newIOHashMap
107
109
sAlive <- newTVarIO True
108
110
pure SchedState {.. }
109
111
@@ -131,7 +133,7 @@ pollJob :: (MonadIO m, MonadBaseControl IO m) => SchedT m ()
131
133
pollJob = do
132
134
SchedState {.. } <- get
133
135
SchedConfig {.. } <- lift ask
134
- mapM_ checkPoll =<< liftIO (FL. toList sSchedJobQ )
136
+ mapM_ checkPoll =<< liftIO (FL. toList sTaskList )
135
137
136
138
next <- liftIO $ (+ sSchedDelay * 2 ) <$> getEpochTime
137
139
mapM_ checkJob =<< liftIO (JQ. findLessJob sJobQueue next)
@@ -140,21 +142,20 @@ pollJob = do
140
142
:: (MonadIO m , MonadBaseControl IO m )
141
143
=> Job -> SchedT m ()
142
144
checkJob job@ Job {.. } = do
143
- SchedState {.. } <- get
144
- SchedConfig {.. } <- lift ask
145
- w <- liftIO $ FL. lookup sSchedJobQ (jHandle job)
145
+ w <- findTask job
146
+ schedDelay <- lift $ asks sSchedDelay
146
147
unless (isJust w) $ do
147
- now <- liftIO $ getEpochTime
148
- when (jSchedAt > now || jSchedAt + sSchedDelay < now) $ reSchedJob job
148
+ now <- liftIO getEpochTime
149
+ when (jSchedAt > now || jSchedAt + schedDelay < now) $ reSchedJob job
149
150
150
151
checkPoll
151
152
:: (MonadIO m , MonadBaseControl IO m )
152
153
=> (JobHandle , Async (StM (SchedT m ) () )) -> SchedT m ()
153
154
checkPoll (jh, w) = do
154
- SchedState { .. } <- get
155
+ taskList <- gets sTaskList
155
156
r <- poll w
156
157
case r of
157
- Just (Right () ) -> liftIO $ FL. delete sSchedJobQ jh
158
+ Just (Right () ) -> liftIO $ FL. delete taskList jh
158
159
_ -> pure ()
159
160
160
161
@@ -176,27 +177,32 @@ pushJob job@Job{..} = do
176
177
177
178
reSchedJob :: (MonadIO m , MonadBaseControl IO m ) => Job -> SchedT m ()
178
179
reSchedJob job = do
179
- SchedState { .. } <- get
180
- SchedConfig { .. } <- lift ask
181
- w <- liftIO $ FL. lookup sSchedJobQ (jHandle job)
180
+ schedDelay <- lift $ asks sSchedDelay
181
+ taskList <- gets sTaskList
182
+ w <- findTask job
182
183
when (isJust w) $ do
183
184
cancel (fromJust w)
184
- liftIO $ FL. delete sSchedJobQ (jHandle job)
185
+ liftIO $ FL. delete taskList (jHandle job)
185
186
186
- next <- liftIO $ (+ sSchedDelay * 2 ) <$> getEpochTime
187
+ next <- liftIO $ (+ schedDelay * 2 ) <$> getEpochTime
187
188
when (jSchedAt job < next) $ do
188
189
w' <- schedJob job
189
- liftIO $ FL. insert sSchedJobQ (jHandle job) w'
190
+ liftIO $ FL. insert taskList (jHandle job) w'
191
+
192
+ findTask :: (MonadIO m ) => Job -> SchedT m (Maybe (Task m ))
193
+ findTask job = do
194
+ taskList <- gets sTaskList
195
+ liftIO $ FL. lookup taskList (jHandle job)
190
196
191
197
schedJob
192
198
:: (MonadIO m , MonadBaseControl IO m )
193
- => Job -> SchedT m (Async ( StM ( SchedT m ) () ) )
199
+ => Job -> SchedT m (Task m )
194
200
schedJob job = async $ schedJob_ job
195
201
196
202
schedJob_ :: MonadIO m => Job -> SchedT m ()
197
203
schedJob_ job@ Job {.. } = do
198
204
SchedState {.. } <- get
199
- now <- liftIO $ getEpochTime
205
+ now <- liftIO getEpochTime
200
206
when (jSchedAt > now) . liftIO . threadDelay . fromIntegral $ (jSchedAt - now) * 1000000
201
207
FuncStat {.. } <- liftIO . atomically $ do
202
208
st <- FL. lookupSTM sFuncStatList jFuncName
@@ -245,7 +251,7 @@ schedJob_ job@Job{..} = do
245
251
SchedState {.. } <- get
246
252
liftIO $ do
247
253
JQ. removeJob sJobQueue jFuncName jName
248
- FL. delete sSchedJobQ (jHandle job)
254
+ FL. delete sTaskList (jHandle job)
249
255
250
256
adjustFuncStat :: MonadIO m => FuncName -> SchedT m ()
251
257
adjustFuncStat fn = do
@@ -279,10 +285,10 @@ removeJob job = do
279
285
280
286
adjustFuncStat (jFuncName job)
281
287
282
- w <- liftIO $ FL. lookup sSchedJobQ (jHandle job)
288
+ w <- findTask job
283
289
when (isJust w) $ do
284
290
cancel (fromJust w)
285
- liftIO $ FL. delete sSchedJobQ (jHandle job)
291
+ liftIO $ FL. delete sTaskList (jHandle job)
286
292
287
293
dumpJob :: MonadIO m => SchedT m [Job ]
288
294
dumpJob = do
@@ -343,7 +349,7 @@ failJob jh = do
343
349
SchedState {.. } <- get
344
350
job <- liftIO $ PQ. lookupJob sProcessJob fn jn
345
351
when (isJust job) $ do
346
- nextSchedAt <- liftIO $ getEpochTime
352
+ nextSchedAt <- liftIO getEpochTime
347
353
retryJob ((fromJust job) {jSchedAt = nextSchedAt})
348
354
349
355
where (fn, jn) = unHandle jh
@@ -390,7 +396,7 @@ status = liftIO . FL.elems =<< gets sFuncStatList
390
396
391
397
revertProcessQueue :: (MonadIO m , MonadBaseControl IO m ) => SchedT m ()
392
398
revertProcessQueue = do
393
- now <- liftIO $ getEpochTime
399
+ now <- liftIO getEpochTime
394
400
queue <- gets sProcessJob
395
401
mapM_ (failJob . jHandle)
396
402
=<< filter (isTimeout now) <$> liftIO (PQ. dumpJob queue)
@@ -406,6 +412,6 @@ shutdown = do
406
412
writeTVar sAlive False
407
413
return t
408
414
when alive $ do
409
- mapM_ cancel =<< liftIO (FL. elems sSchedJobQ )
415
+ mapM_ cancel =<< liftIO (FL. elems sTaskList )
410
416
saveJob
411
417
void . async $ liftIO sCleanup
0 commit comments