Skip to content

Commit 442b084

Browse files
committed
runAgentT and runPeriodicT to ConnectionT
1 parent 81987c0 commit 442b084

File tree

9 files changed

+130
-140
lines changed

9 files changed

+130
-140
lines changed

periodic-client/periodic-client.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ library
2626
, byteable
2727
, monad-control
2828
, exceptions
29+
, transformers
2930
, lifted-base
3031
, lifted-async
3132
default-language: Haskell2010

periodic-client/src/Periodic/Client.hs

+10-6
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,16 @@ import System.Timeout.Lifted (timeout)
5050
type ClientT m = PeriodicT m ()
5151

5252
data ClientEnv m = ClientEnv
53-
{ periodicEnv :: Env m ()
54-
, periodicState :: PeriodicState
53+
{ periodicEnv :: Env m ()
54+
, periodicState :: PeriodicState
55+
, connectionConfig :: Conn.ConnectionConfig
56+
, connectionState :: Conn.ConnectionState
5557
}
5658

5759
runClientT :: Monad m => ClientEnv m -> ClientT m a -> m a
58-
runClientT ClientEnv {..} = runPeriodicT periodicState periodicEnv
60+
runClientT ClientEnv {..} =
61+
Conn.runConnectionT connectionState connectionConfig
62+
. runPeriodicT periodicState periodicEnv
5963

6064
open
6165
:: (MonadIO m, MonadBaseControl IO m, MonadCatch m, MonadMask m)
@@ -71,9 +75,9 @@ open f h = do
7175
Conn.send $ toBytes TypeClient
7276
void Conn.receive
7377

74-
let env0 = initEnv () connectionConfig
75-
state0 <- liftIO $ initPeriodicState connectionState
76-
let clientEnv = ClientEnv env0 state0
78+
let periodicEnv = initEnv ()
79+
periodicState <- liftIO initPeriodicState
80+
let clientEnv = ClientEnv{..}
7781

7882
runClientT clientEnv $ do
7983
void . liftBaseDiscard forkIO $ forever $ do

periodic-client/src/Periodic/Worker.hs

+14-12
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ module Periodic.Worker
1717

1818
import Control.Monad.Catch (MonadCatch, MonadMask, catch)
1919
import Control.Monad.IO.Class (MonadIO (..))
20+
import Control.Monad.Trans.Class (lift)
2021
import Control.Monad.Trans.Control (MonadBaseControl)
2122
import Data.Byteable (toBytes)
22-
import Periodic.Agent (Agent, readerSize, receive,
23+
import Periodic.Agent (AgentReader, Msgid,
24+
readerSize, receive,
2325
runAgentT, send)
2426
import Periodic.Job (JobConfig, JobT, func_,
2527
initJobConfig, name, workFail)
@@ -64,13 +66,13 @@ runWorkerT f h m = do
6466
Conn.send $ toBytes TypeWorker
6567
void Conn.receive
6668

67-
taskList <- liftIO newIOHashMap
68-
let env0 = initEnv taskList connectionConfig
69-
state0 <- liftIO $ initPeriodicState connectionState
69+
taskList <- liftIO newIOHashMap
70+
let env0 = initEnv taskList
71+
state0 <- liftIO initPeriodicState
7072

71-
runPeriodicT state0 env0 $ do
72-
void $ async startMainLoop
73-
m
73+
runPeriodicT state0 env0 $ do
74+
void $ async startMainLoop
75+
m
7476

7577
close :: MonadIO m => WorkerT m ()
7678
close = stopPeriodicT
@@ -110,9 +112,9 @@ removeFunc f = do
110112

111113
grabJob
112114
:: (MonadIO m, MonadMask m, MonadBaseControl IO m)
113-
=> Agent -> WorkerT m (Maybe JobConfig)
114-
grabJob (agentState, agentConfig) = do
115-
pl <- liftPeriodicT . runAgentT agentState agentConfig $ do
115+
=> Msgid -> AgentReader -> WorkerT m (Maybe JobConfig)
116+
grabJob mid reader = do
117+
pl <- lift . lift . runAgentT reader mid $ do
116118
size <- readerSize
117119
when (size == 0) $ send GrabJob
118120
timeout 10000000 receive
@@ -135,9 +137,9 @@ work_
135137
=> WorkerT m ()
136138
work_ = do
137139
taskList <- env
138-
agent <- newAgent
140+
(mid, reader) <- newAgentEnv
139141
forever $ do
140-
j <- grabJob agent
142+
j <- grabJob mid reader
141143
when (isJust j) $
142144
withEnv (fromJust j) $ do
143145
f <- func_

periodic-common/src/Periodic/Agent.hs

+29-43
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,24 @@
22

33
module Periodic.Agent
44
(
5-
AgentState
6-
, AgentConfig
7-
, AgentList
8-
, Agent
5+
AgentReader
96
, Msgid
7+
, AgentEnv (..)
108
, AgentT
119
, runAgentT
12-
, initAgentConfig
13-
, initAgentState
10+
, mkAgentReader
1411
, send
1512
, send_
1613
, agentid
1714
, msgid
18-
, msgid'
1915
, msgidLength
2016
, aAlive
2117
, feed
2218
, receive
2319
, receive_
2420
, readerSize
25-
, agent
21+
, agentEnv
22+
, runAgentTWithEnv
2623
, liftAgentT
2724
) where
2825

@@ -39,54 +36,43 @@ import Control.Monad (when)
3936
import Control.Monad.IO.Class (MonadIO (..))
4037
import Control.Monad.STM (atomically, retry)
4138
import Control.Monad.Trans.Class (lift)
42-
import Control.Monad.Trans.Reader
39+
import Control.Monad.Trans.Reader (ReaderT, ask, runReaderT)
4340
import Control.Monad.Trans.State (StateT, evalStateT, get)
4441
import Data.Byteable (Byteable (..))
45-
import Periodic.IOHashMap (IOHashMap)
4642
import Periodic.Types.Internal
4743

4844
type AgentReader = TVar [ByteString]
4945
type Msgid = ByteString
5046

51-
data AgentState = AgentState
52-
{ aReader :: AgentReader
53-
, connectionState :: ConnectionState
54-
}
55-
56-
data AgentConfig = AgentConfig
57-
{ aMsgid :: Msgid
47+
data AgentEnv = AgentEnv
48+
{ agentReader :: AgentReader
49+
, agentMsgid :: Msgid
50+
, connectionState :: ConnectionState
5851
, connectionConfig :: ConnectionConfig
5952
}
6053

61-
type Agent = (AgentState, AgentConfig)
62-
type AgentList = IOHashMap Msgid Agent
63-
64-
msgid' :: AgentConfig -> Msgid
65-
msgid' = aMsgid
66-
67-
agentid :: AgentConfig -> ByteString
68-
agentid config = B.concat [aMsgid config, connid' $ connectionConfig config]
54+
agentid :: AgentEnv -> ByteString
55+
agentid AgentEnv{..} = B.concat [agentMsgid, connid' connectionConfig]
6956

7057
type AgentT m = StateT AgentReader (ReaderT Msgid (ConnectionT m))
7158

7259
runAgentT
7360
:: Monad m
74-
=> AgentState
75-
-> AgentConfig
61+
=> AgentReader
62+
-> Msgid
7663
-> AgentT m a
77-
-> m a
78-
runAgentT state config =
79-
runConnectionT (connectionState state) (connectionConfig config)
80-
. flip runReaderT (aMsgid config)
81-
. flip evalStateT (aReader state)
64+
-> ConnectionT m a
65+
runAgentT reader mid =
66+
flip runReaderT mid
67+
. flip evalStateT reader
8268

83-
initAgentState :: ConnectionState -> IO AgentState
84-
initAgentState connectionState = do
85-
aReader <- newTVarIO []
86-
return AgentState{..}
69+
runAgentTWithEnv :: Monad m => AgentEnv -> AgentT m a -> m a
70+
runAgentTWithEnv AgentEnv {..} =
71+
runConnectionT connectionState connectionConfig
72+
. runAgentT agentReader agentMsgid
8773

88-
initAgentConfig :: Msgid -> ConnectionConfig -> AgentConfig
89-
initAgentConfig = AgentConfig
74+
mkAgentReader :: [ByteString] -> IO AgentReader
75+
mkAgentReader = newTVarIO
9076

9177
msgid :: Monad m => AgentT m Msgid
9278
msgid = lift ask
@@ -125,13 +111,13 @@ receive = runParser <$> receive_
125111
readerSize :: MonadIO m => AgentT m Int
126112
readerSize = fmap length $ liftIO . readTVarIO =<< get
127113

128-
agent :: Monad m => AgentT m Agent
129-
agent = do
130-
aReader <- get
131-
aMsgid <- lift ask
114+
agentEnv :: Monad m => AgentT m AgentEnv
115+
agentEnv = do
116+
agentReader <- get
117+
agentMsgid <- lift ask
132118
connectionState <- lift $ lift get
133119
connectionConfig <- lift . lift $ lift ask
134-
pure (AgentState {..}, AgentConfig {..})
120+
pure AgentEnv{..}
135121

136122
liftAgentT :: Monad m => m a -> AgentT m a
137123
liftAgentT = lift . lift . lift . lift

periodic-common/src/Periodic/Monad.hs

+33-43
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ module Periodic.Monad
1616
, runPeriodicT
1717
, startMainLoop
1818
, withAgentT
19-
, newAgent
2019
, liftPeriodicT
2120
, isAlive
2221
, stopPeriodicT
2322
, env
23+
, newAgentEnv
2424
) where
2525

2626
import Control.Concurrent (forkIO)
@@ -34,32 +34,28 @@ import Control.Monad.STM (atomically)
3434
import Control.Monad.Trans.Class (lift)
3535
import Control.Monad.Trans.Control (MonadBaseControl, liftBaseDiscard)
3636
import Control.Monad.Trans.Maybe (runMaybeT)
37-
import Control.Monad.Trans.Reader
37+
import Control.Monad.Trans.Reader (ReaderT, ask, asks, runReaderT)
3838
import Control.Monad.Trans.State (StateT (..), evalStateT, get,
3939
gets)
4040
import Data.ByteString (ByteString)
4141
import qualified Data.ByteString as B (drop, empty, take)
4242
import Periodic.Agent hiding (receive)
43-
import Periodic.Connection (ConnectionConfig, ConnectionState,
44-
ConnectionT, close, receive,
45-
runConnectionT)
46-
import Periodic.IOHashMap (newIOHashMap)
43+
import Periodic.Connection (ConnectionT, close, receive)
44+
import Periodic.IOHashMap (IOHashMap, newIOHashMap)
4745
import qualified Periodic.IOHashMap as HM (delete, elems, insert,
4846
lookup, member)
4947
import System.Entropy (getEntropy)
5048
import System.Log.Logger (errorM)
5149

5250

5351
data Env m u = Env
54-
{ uEnv :: u
55-
, connectionConfig :: ConnectionConfig
56-
, agentHandler :: AgentT m ()
52+
{ uEnv :: u
53+
, agentHandler :: AgentT m ()
5754
}
5855

5956
data PeriodicState = PeriodicState
60-
{ status :: TVar Bool
61-
, agentList :: AgentList
62-
, connectionState :: ConnectionState
57+
{ status :: TVar Bool
58+
, agentList :: IOHashMap Msgid (Msgid, AgentReader)
6359
}
6460

6561
type PeriodicT m u = StateT PeriodicState (ReaderT (Env m u) (ConnectionT m))
@@ -69,16 +65,15 @@ runPeriodicT
6965
=> PeriodicState
7066
-> Env m u
7167
-> PeriodicT m u a
72-
-> m a
68+
-> ConnectionT m a
7369
runPeriodicT state config =
74-
runConnectionT (connectionState state) (connectionConfig config)
75-
. flip runReaderT config
70+
flip runReaderT config
7671
. flip evalStateT state
7772

78-
initEnv :: MonadIO m => u -> ConnectionConfig -> Env m u
79-
initEnv u c = Env u c defaultAgentHandler
73+
initEnv :: MonadIO m => u -> Env m u
74+
initEnv u = Env u defaultAgentHandler
8075

81-
initEnv_ :: u -> ConnectionConfig -> AgentT m () -> Env m u
76+
initEnv_ :: u -> AgentT m () -> Env m u
8277
initEnv_ = Env
8378

8479
defaultAgentHandler :: MonadIO m => AgentT m ()
@@ -90,32 +85,29 @@ withEnv :: (Monad m) => u1 -> PeriodicT m u1 a -> PeriodicT m u a
9085
withEnv u m = do
9186
state0 <- get
9287
env0 <- lift ask
93-
liftPeriodicT $ runPeriodicT state0 (env0 {uEnv=u}) m
88+
lift . lift $ runPeriodicT state0 (env0 {uEnv=u}) m
9489

95-
initPeriodicState :: ConnectionState -> IO PeriodicState
96-
initPeriodicState connectionState = do
90+
initPeriodicState :: IO PeriodicState
91+
initPeriodicState = do
9792
status <- newTVarIO True
9893
agentList <- newIOHashMap
9994
pure PeriodicState{..}
10095

10196
withAgentT :: (MonadIO m, Monad m, MonadMask m) => AgentT m a -> PeriodicT m u a
10297
withAgentT agentT =
10398
bracket newMsgid removeMsgid $ \mid -> do
104-
(agentState, agentConfig) <- newAgentEnv mid
105-
liftPeriodicT $ runAgentT agentState agentConfig agentT
99+
(_, reader) <- newAgentEnv_ mid
100+
lift . lift $ runAgentT reader mid agentT
106101

107-
newAgentEnv :: (Monad m, MonadIO m) => Msgid -> PeriodicT m u Agent
108-
newAgentEnv mid = do
102+
newAgentEnv_ :: (Monad m, MonadIO m) => Msgid -> PeriodicT m u (Msgid, AgentReader)
103+
newAgentEnv_ mid = do
109104
PeriodicState{..} <- get
110-
Env {..} <- lift ask
111-
let agentConfig = initAgentConfig mid connectionConfig
112-
agentState <- liftIO $ initAgentState connectionState
113-
liftIO $ HM.insert agentList mid (agentState, agentConfig)
114-
return (agentState, agentConfig)
115-
116-
newAgent :: (MonadIO m, Monad m) => PeriodicT m u Agent
117-
newAgent = newAgentEnv =<< newMsgid
105+
reader <- liftIO $ mkAgentReader []
106+
liftIO $ HM.insert agentList mid (mid, reader)
107+
return (mid, reader)
118108

109+
newAgentEnv :: (MonadIO m) => PeriodicT m u (Msgid, AgentReader)
110+
newAgentEnv = newAgentEnv_ =<< newMsgid
119111

120112
liftPeriodicT :: (Functor m, Applicative m, Monad m) => m a -> PeriodicT m u a
121113
liftPeriodicT = lift . lift . lift . lift
@@ -162,14 +154,12 @@ doFeed bs = do
162154
Env{..} <- lift ask
163155
v <- liftIO . HM.lookup agentList $ B.take msgidLength bs
164156
case v of
165-
Just (agentState, agentConfig) ->
166-
liftPeriodicT . runAgentT agentState agentConfig . feed $ B.drop msgidLength bs
157+
Just (mid, reader) ->
158+
lift . lift . runAgentT reader mid . feed $ B.drop msgidLength bs
167159
Nothing -> do
168-
let agentConfig = initAgentConfig (B.take msgidLength bs) connectionConfig
169-
agentState <- liftIO $ initAgentState connectionState
170-
liftPeriodicT . runAgentT agentState agentConfig $ do
171-
feed $ B.drop msgidLength bs
172-
agentHandler
160+
let mid = B.take msgidLength bs
161+
reader <- liftIO $ mkAgentReader [B.drop msgidLength bs]
162+
lift . lift $ runAgentT reader mid agentHandler
173163

174164
startMainLoop
175165
:: (MonadIO m, MonadBaseControl IO m, MonadCatch m)
@@ -189,9 +179,9 @@ isAlive = liftIO . readTVarIO =<< gets status
189179
doFeedError :: MonadIO m => PeriodicT m u ()
190180
doFeedError =
191181
gets agentList >>= liftIO . HM.elems >>= mapM_ go
192-
where go :: MonadIO m => (AgentState, AgentConfig) -> PeriodicT m u ()
193-
go (agentState, agentConfig) =
194-
liftPeriodicT $ runAgentT agentState agentConfig $ feed B.empty
182+
where go :: MonadIO m => (Msgid, AgentReader) -> PeriodicT m u ()
183+
go (mid, reader) =
184+
lift . lift $ runAgentT reader mid $ feed B.empty
195185

196186
stopPeriodicT :: MonadIO m => PeriodicT m u ()
197187
stopPeriodicT = do

0 commit comments

Comments
 (0)