Skip to content

Commit

Permalink
periodic-server: poll job with TMVar lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Lupino committed Jan 27, 2022
1 parent 591f320 commit 85c57ff
Showing 1 changed file with 10 additions and 24 deletions.
34 changes: 10 additions & 24 deletions periodic-server/src/Periodic/Server/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ import System.Log.Logger (debugM, infoM)
import UnliftIO hiding (poll)
import UnliftIO.Concurrent (threadDelay)

data PollJob = PollJob
deriving (Show, Eq)

type Waiter = (Nid, Msgid)

data WaitItem = WaitItem
Expand Down Expand Up @@ -121,7 +118,7 @@ data SchedEnv db = SchedEnv
, sLocker :: L.Lock
, sGrabQueue :: GrabQueue
-- sched state, when false sched is exited.
, sPollJob :: TVar (Maybe PollJob)
, sPollJob :: TMVar ()
, sChanList :: TQueue Job
, sWaitList :: WaitList
, sLockList :: LockList
Expand Down Expand Up @@ -170,7 +167,7 @@ initSchedEnv config sGrabQueue sC sAssignJob sPushData sHook = do
sWaitList <- IOMap.empty
sLockList <- IOMap.empty
sLocker <- L.new
sPollJob <- newTVarIO Nothing
sPollJob <- newEmptyTMVarIO
sChanList <- newTQueueIO
sRevertInterval <- newTVarIO 300
sTaskTimeout <- newTVarIO 600
Expand All @@ -183,7 +180,7 @@ initSchedEnv config sGrabQueue sC sAssignJob sPushData sHook = do
sAssignJobTime <- IOMap.empty
sMaxPoolSize <- newTVarIO 10
sSchedPool <- newSchedPool sMaxPoolSize sMaxBatchSize
(writeTVar sPollJob (Just PollJob)) $ \fn -> do
(void $ tryPutTMVar sPollJob ()) $ \fn -> do
mFuncStat <- IOMapS.lookup fn sFuncStatList
case mFuncStat of
Nothing -> pure $ Just []
Expand All @@ -210,7 +207,7 @@ startSchedT = do
runTask 0 runPushJob
runTask 100 purgeExpired
runTask 60 revertLockedQueue
runTask 60 $ pushPollJob PollJob
runTask 60 pushPollJob
runTask 100 purgeEmptyLock
runTask 0 $ runSchedPool sSchedPool schedJob $ retryLater 10

Expand Down Expand Up @@ -280,19 +277,8 @@ runTask_ d m = do
runPollJob :: (MonadUnliftIO m, Persist db) => SchedT db m ()
runPollJob = do
cl <- asks sPollJob
al <- asks sAlive
mPollJob <- atomically $ do
mPollJob <- readTVar cl
case mPollJob of
Nothing -> do
st <- readTVar al
if st then retrySTM
else pure Nothing
Just action -> do
writeTVar cl Nothing
pure $ Just action

mapM_ (const pollJob) mPollJob
atomically $ takeTMVar cl
pollJob

pollJob :: (MonadIO m, Persist db) => SchedT db m ()
pollJob = do
Expand Down Expand Up @@ -322,10 +308,10 @@ getAvaliableFuncList = do
foldFunc (fn, _) acc = fn:acc


pushPollJob :: MonadIO m => PollJob -> SchedT db m ()
pushPollJob act = do
pushPollJob :: MonadIO m => SchedT db m ()
pushPollJob = do
cl <- asks sPollJob
atomically $ writeTVar cl (Just act)
void $ atomically $ tryPutTMVar cl ()


pushChanJob :: MonadIO m => Job -> SchedT db m ()
Expand Down Expand Up @@ -474,7 +460,7 @@ alterFunc n f = do
SchedEnv{..} <- ask
IOMap.alter f n sFuncStatList
liftIO $ P.insertFuncName sPersist n
pushPollJob PollJob
pushPollJob

addFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db m ()
addFunc n = broadcastFunc n False
Expand Down

0 comments on commit 85c57ff

Please sign in to comment.