Skip to content

Commit

Permalink
update sched job
Browse files Browse the repository at this point in the history
  • Loading branch information
Lupino committed Mar 13, 2018
1 parent ee8649e commit 03f54c3
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions periodic-server/src/Periodic/Server/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -210,40 +210,44 @@ schedJob_ job@Job{..} = do
Nothing -> retry
Just FuncStat{sWorker=0} -> retry
Just st' -> pure st'
if sBroadcast then popAgentListThen $ doneSubmitJob True
else popAgentThen $ doneSubmitJob False
if sBroadcast then popAgentListThen
else popAgentThen

where popAgentThen :: MonadIO m => (AgentEnv -> SchedT m ()) -> SchedT m ()
popAgentThen done = do
where popAgentThen
:: MonadIO m => SchedT m ()
popAgentThen = do
SchedState{..} <- get
(jq, env0) <- liftIO $ atomically $ popAgentSTM sGrabQueue jFuncName
alive <- liftIO $ runAgentTWithEnv env0 aAlive
when alive $ do
liftIO $ IL.insert jq (jHandle job)
done env0

popAgentListThen :: MonadIO m => (AgentEnv -> SchedT m ()) -> SchedT m ()
popAgentListThen done = do
if alive then do
liftIO $ do
IL.insert jq (jHandle job)
nextSchedAt <- getEpochTime
PQ.insertJob sProcessJob job {jSchedAt = nextSchedAt}
r <- doSubmitJob env0
case r of
Left _ -> do
liftIO $ do
PQ.removeJob sProcessJob jFuncName (hashJobName jName)
IL.delete jq (jHandle job)
schedJob_ job
Right _ -> do
adjustFuncStat jFuncName
endSchedJob
else schedJob_ job

popAgentListThen :: MonadIO m => SchedT m ()
popAgentListThen = do
SchedState{..} <- get
agents <- liftIO $ popAgentList sGrabQueue jFuncName
mapM_ (done . snd) agents
unless (null agents) endSchedJob
mapM_ (doSubmitJob . snd) agents
adjustFuncStat jFuncName
unless (null agents) endSchedJob -- wait to resched the broadcast job

doneSubmitJob :: MonadIO m => Bool -> AgentEnv -> SchedT m ()
doneSubmitJob cast agent = do
doSubmitJob :: MonadIO m => AgentEnv -> SchedT m (Either SomeException ())
doSubmitJob agent = do
SchedState{..} <- get
unless cast . liftIO $ do
nextSchedAt <- getEpochTime
PQ.insertJob sProcessJob job { jSchedAt = nextSchedAt }

e <- liftIO . try $ assignJob agent job
case e of
Left (_::SomeException) ->
unless cast $ liftIO $
PQ.removeJob sProcessJob jFuncName (hashJobName jName)
Right _ -> do
adjustFuncStat jFuncName
endSchedJob
liftIO . try $ assignJob agent job

endSchedJob :: MonadIO m => SchedT m ()
endSchedJob = do
Expand Down

0 comments on commit 03f54c3

Please sign in to comment.