Skip to content

Commit 416fb66

Browse files
committed
monad trans Periodic.Server
1 parent e14dfc3 commit 416fb66

File tree

2 files changed

+163
-107
lines changed

2 files changed

+163
-107
lines changed

periodic-server/periodicd.cabal

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ library
2929
, network
3030
, unordered-containers
3131
, psqueues
32-
, unix
3332
, directory
3433
, filepath
3534
, hslogger
+163-106
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,177 @@
1-
{-# LANGUAGE ScopedTypeVariables #-}
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE MultiParamTypeClasses #-}
3+
{-# LANGUAGE OverloadedStrings #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
26

37
module Periodic.Server
48
(
59
startServer
610
) where
711

8-
import Control.Monad (forever, void, when)
9-
import Network.Socket (Socket, accept)
10-
import qualified Network.Socket as Socket (close)
11-
12-
-- process
13-
import Control.Concurrent (forkIO, killThread, threadDelay)
14-
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar,
15-
tryPutMVar)
16-
import System.Posix.Signals (Handler (Catch), installHandler,
17-
sigINT, sigTERM)
18-
19-
-- server
20-
import Control.Exception (SomeException)
21-
import Control.Monad.Catch (try)
22-
import Control.Monad.IO.Class (liftIO)
23-
import Control.Monad.Trans.Class (lift)
24-
import Data.ByteString (ByteString)
25-
import Data.Int (Int64)
12+
import Control.Concurrent (threadDelay)
13+
import Control.Concurrent.Async.Lifted (async)
14+
import Control.Concurrent.STM.TVar
15+
import Control.Exception (SomeException)
16+
import Control.Monad (forever, mzero, unless, void,
17+
when)
18+
import Control.Monad.Catch (MonadCatch, try)
19+
import Control.Monad.IO.Class (MonadIO (..))
20+
import Control.Monad.STM (atomically)
21+
import Control.Monad.Trans.Class (lift)
22+
import Control.Monad.Trans.Control (MonadBaseControl)
23+
import Control.Monad.Trans.Maybe (runMaybeT)
24+
import Control.Monad.Trans.Reader
25+
import Control.Monad.Trans.State (StateT, evalStateT, get, gets)
26+
import Data.ByteString (ByteString)
27+
import Data.Either (isLeft)
28+
import Data.Int (Int64)
29+
import Network.Socket (Socket, accept)
30+
import qualified Network.Socket as Socket (close)
2631
import Periodic.Connection
27-
import qualified Periodic.Connection as Conn
28-
import Periodic.IOHashMap (IOHashMap, newIOHashMap)
29-
import qualified Periodic.IOHashMap as HM
32+
import qualified Periodic.Connection as Conn
33+
import Periodic.IOHashMap (IOHashMap, newIOHashMap)
34+
import qualified Periodic.IOHashMap as HM
3035
import Periodic.Server.Client
31-
import qualified Periodic.Server.Client as Client
36+
import qualified Periodic.Server.Client as Client
3237
import Periodic.Server.Scheduler
3338
import Periodic.Server.Worker
34-
import qualified Periodic.Server.Worker as Worker
35-
import Periodic.Transport (Transport)
36-
import Periodic.Types (ClientType (..), runParser)
37-
import Periodic.Utils (getEpochTime, tryIO)
38-
import System.Log.Logger (errorM)
39+
import qualified Periodic.Server.Worker as Worker
40+
import Periodic.Transport (Transport)
41+
import Periodic.Types (ClientType (..), runParser)
42+
import Periodic.Utils (getEpochTime)
43+
import System.Log.Logger (errorM)
44+
45+
type ClientList m = IOHashMap ByteString (ClientEnv m)
46+
type WorkerList m = IOHashMap ByteString (WorkerEnv m)
47+
48+
data ServerConfig = ServerConfig
49+
{ schedConfig :: SchedConfig
50+
, mkTransport :: (Socket -> IO Transport)
51+
, serveSock :: Socket
52+
}
53+
54+
data ServerState m = ServerState
55+
{ clientList :: ClientList m
56+
, workerList :: WorkerList m
57+
, schedState :: SchedState m
58+
, serveState :: TVar Bool
59+
}
60+
61+
type ServerT m = StateT (ServerState m) (ReaderT ServerConfig (SchedT m))
62+
63+
runServerT :: Monad m => ServerState m -> ServerConfig -> ServerT m a -> m a
64+
runServerT serverState serverConfig =
65+
runSchedT (schedState serverState) (schedConfig serverConfig) .
66+
flip runReaderT serverConfig . flip evalStateT serverState
67+
68+
runSchedT' :: Monad m => SchedT m a -> ServerT m a
69+
runSchedT' = lift . lift
70+
71+
initServerConfig :: SchedConfig -> (Socket -> IO Transport) -> Socket -> ServerConfig
72+
initServerConfig = ServerConfig
73+
74+
initServerState :: TVar Bool -> IO (ServerState m)
75+
initServerState serveState = do
76+
clientList <- newIOHashMap
77+
workerList <- newIOHashMap
78+
schedState <- initSchedState
79+
pure ServerState{..}
3980

40-
type ClientList = IOHashMap ByteString (ClientEnv IO)
41-
type WorkerList = IOHashMap ByteString (WorkerEnv IO)
81+
serveForever
82+
:: (MonadIO m, MonadBaseControl IO m, MonadCatch m)
83+
=> ServerT m ()
84+
serveForever = do
85+
runSchedT' startSchedT
4286

43-
handleExit :: MVar () -> IO ()
44-
handleExit mv = void $ tryPutMVar mv ()
87+
liftS4 . flip runCheckClientState 100 =<< gets clientList
88+
liftS4 . flip runCheckWorkerState 100 =<< gets workerList
4589

46-
startServer :: (Socket -> IO Transport) -> FilePath -> Socket -> IO ()
47-
startServer makeTransport storePath sock = do
48-
-- Handle dying
49-
bye <- newEmptyMVar
50-
void $ installHandler sigTERM (Catch $ handleExit bye) Nothing
51-
void $ installHandler sigINT (Catch $ handleExit bye) Nothing
90+
state <- gets serveState
5291

53-
schedConfig <- initSchedConfig storePath $ handleExit bye
54-
schedState <- initSchedState
92+
void . runMaybeT . forever $ do
93+
e <- lift tryServeOnce
94+
when (isLeft e) $ mzero
95+
alive <- liftIO $ readTVarIO state
96+
unless alive mzero
5597

56-
runSchedT schedState schedConfig startSchedT
5798

58-
clientList <- newIOHashMap
59-
workerList <- newIOHashMap
99+
runSchedT' shutdown
100+
liftIO . Socket.close =<< lift (asks serveSock)
60101

61-
runCheckClientState clientList 100
62-
runCheckWorkerState workerList 100
63-
64-
thread <- forkIO $ forever $ do
65-
-- if accept failed exit
66-
e <- tryIO $ mainLoop makeTransport sock schedState schedConfig clientList workerList
67-
case e of
68-
Right _ -> return ()
69-
Left e' -> do
70-
print e'
71-
handleExit bye
72-
73-
takeMVar bye
74-
killThread thread
75-
runSchedT schedState schedConfig shutdown
76-
Socket.close sock
77-
78-
mainLoop
79-
:: (Socket -> IO Transport)
80-
-> Socket
81-
-> SchedState IO -> SchedConfig
82-
-> ClientList -> WorkerList -> IO ()
83-
mainLoop makeTransport sock schedState schedConfig clientList workerList = do
84-
(sock', _) <- accept sock
85-
void $ forkIO $ handleConnection schedState schedConfig clientList workerList =<< makeTransport sock'
102+
tryServeOnce
103+
:: (MonadIO m, MonadBaseControl IO m, MonadCatch m)
104+
=> ServerT m (Either SomeException ())
105+
tryServeOnce = try serveOnce
106+
107+
serveOnce
108+
:: (MonadIO m, MonadBaseControl IO m, MonadCatch m)
109+
=> ServerT m ()
110+
serveOnce = do
111+
(sock', _) <- liftIO . accept =<< lift (asks serveSock)
112+
makeTransport <- lift (asks mkTransport)
113+
void $ async $ handleConnection =<< liftIO (makeTransport sock')
86114

87115
handleConnection
88-
:: SchedState IO -> SchedConfig
89-
-> ClientList -> WorkerList -> Transport -> IO ()
90-
handleConnection schedState schedConfig clientList workerList transport = do
91-
connectionConfig <- initServerConnectionConfig transport
92-
connectionState <- initConnectionState
93-
94-
runConnectionT connectionState connectionConfig $
95-
receiveThen $ \pl ->
96-
sendThen $
97-
case runParser pl of
98-
Left _ -> Conn.close
99-
Right TypeClient -> do
100-
cid <- connid
101-
liftIO $ do
102-
clientEnv <- initClientEnv connectionState connectionConfig schedState schedConfig
103-
HM.insert clientList cid clientEnv
104-
startClientT clientEnv
105-
HM.delete clientList cid
106-
Right TypeWorker -> do
107-
cid <- connid
108-
liftIO $ do
109-
workerEnv <- initWorkerEnv connectionState connectionConfig schedState schedConfig
110-
HM.insert workerList cid workerEnv
111-
startWorkerT workerEnv
112-
HM.delete workerList cid
113-
114-
where receiveThen :: (ByteString -> ConnectionT IO ()) -> ConnectionT IO ()
116+
:: (MonadIO m, MonadBaseControl IO m, MonadCatch m)
117+
=> Transport -> ServerT m ()
118+
handleConnection transport = do
119+
connectionConfig <- liftIO $ initServerConnectionConfig transport
120+
connectionState <- liftIO $ initConnectionState
121+
122+
ServerState{..} <- get
123+
ServerConfig{..} <- lift ask
124+
125+
lift . lift . runConnectionT connectionState connectionConfig $
126+
receiveThen $ \pl ->
127+
sendThen $
128+
case runParser pl of
129+
Left _ -> Conn.close
130+
Right TypeClient -> do
131+
cid <- connid
132+
clientEnv <- liftC4 $ initClientEnv
133+
connectionState connectionConfig schedState schedConfig
134+
liftIO $ HM.insert clientList cid clientEnv
135+
liftC4 $ startClientT clientEnv
136+
liftIO $ HM.delete clientList cid
137+
Right TypeWorker -> do
138+
cid <- connid
139+
workerEnv <- liftC4 $ initWorkerEnv
140+
connectionState connectionConfig schedState schedConfig
141+
liftIO $ HM.insert workerList cid workerEnv
142+
liftC4 $ startWorkerT workerEnv
143+
liftIO $ HM.delete workerList cid
144+
145+
where receiveThen
146+
:: (MonadIO m, MonadCatch m)
147+
=> (ByteString -> ConnectionT (SchedT m) ()) -> ConnectionT (SchedT m) ()
115148
receiveThen next = do
116149
e <- try receive
117150
case e of
118151
Left (_ :: SomeException) -> Conn.close
119152
Right pl -> next pl
120153

121-
sendThen :: ConnectionT IO () -> ConnectionT IO ()
154+
sendThen
155+
:: (MonadIO m, MonadCatch m)
156+
=> ConnectionT (SchedT m) () -> ConnectionT (SchedT m) ()
122157
sendThen next = do
123158
e <- try $ send =<< connid
124159
case e of
125160
Left (_ :: SomeException) -> Conn.close
126161
Right _ -> next
127162

128-
runCheckWorkerState :: WorkerList -> Int64 -> IO ()
163+
liftC4 :: Monad m => m a -> ConnectionT (SchedT m) a
164+
liftC4 = lift . lift . lift . lift
165+
166+
liftS4 :: Monad m => m a -> ServerT m a
167+
liftS4 = lift . lift . lift . lift
168+
169+
runCheckWorkerState
170+
:: (MonadIO m, MonadBaseControl IO m)
171+
=> WorkerList m -> Int64 -> m ()
129172
runCheckWorkerState ref alive = runCheckState "Worker" ref (checkWorkerState ref alive) alive
130173

131-
checkWorkerState :: WorkerList -> Int64 -> WorkerEnv IO -> IO ()
174+
checkWorkerState :: MonadIO m => WorkerList m -> Int64 -> WorkerEnv m -> m ()
132175
checkWorkerState ref alive env0 = runWorkerT env0 $ do
133176
expiredAt <- (alive +) <$> Worker.getLastVist
134177
now <- liftIO getEpochTime
@@ -137,10 +180,12 @@ checkWorkerState ref alive env0 = runWorkerT env0 $ do
137180
wid <- lift $ lift connid
138181
liftIO $ HM.delete ref wid
139182

140-
runCheckClientState :: ClientList -> Int64 -> IO ()
183+
runCheckClientState
184+
:: (MonadIO m, MonadBaseControl IO m)
185+
=> ClientList m -> Int64 -> m ()
141186
runCheckClientState ref alive = runCheckState "Client" ref (checkClientState ref alive) alive
142187

143-
checkClientState :: ClientList -> Int64 -> ClientEnv IO -> IO ()
188+
checkClientState :: MonadIO m => ClientList m -> Int64 -> ClientEnv m -> m ()
144189
checkClientState ref alive env0 = runClientT env0 $ do
145190
expiredAt <- (alive +) <$> Client.getLastVist
146191
now <- liftIO getEpochTime
@@ -149,9 +194,21 @@ checkClientState ref alive env0 = runClientT env0 $ do
149194
cid <- lift $ lift connid
150195
liftIO $ HM.delete ref cid
151196

152-
runCheckState :: String -> IOHashMap a b -> (b -> IO ()) -> Int64 -> IO ()
153-
runCheckState var ref checkAlive alive = void . forkIO . forever $ do
154-
threadDelay $ fromIntegral alive * 1000 * 1000
155-
mapM_ checkAlive =<< HM.elems ref
156-
size <- HM.size ref
157-
errorM "Periodic.Server" $ "Total " ++ var ++ ": " ++ show size
197+
runCheckState
198+
:: (MonadIO m, MonadBaseControl IO m)
199+
=> String -> IOHashMap a b -> (b -> m ()) -> Int64 -> m ()
200+
runCheckState var ref checkAlive alive = void . async . forever $ do
201+
liftIO $ threadDelay $ fromIntegral alive * 1000 * 1000
202+
mapM_ checkAlive =<< liftIO (HM.elems ref)
203+
size <- liftIO $ HM.size ref
204+
liftIO $ errorM "Periodic.Server" $ "Total " ++ var ++ ": " ++ show size
205+
206+
207+
startServer :: (Socket -> IO Transport) -> FilePath -> Socket -> IO ()
208+
startServer mk path sock = do
209+
state <- newTVarIO True
210+
schedConfig <- initSchedConfig path $ atomically $ writeTVar state False
211+
let serverConfig = initServerConfig schedConfig mk sock
212+
213+
serverState <- initServerState state
214+
runServerT serverState serverConfig serveForever

0 commit comments

Comments
 (0)