Skip to content

Commit

Permalink
shared-kernel/fix:#187/Added Redis Sorted Set Functions and Redis Str…
Browse files Browse the repository at this point in the history
…eams Functions for Job Scheduling --
  • Loading branch information
suraj.kumar1 committed Aug 1, 2023
1 parent 62e5236 commit b05aa40
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 15 deletions.
3 changes: 3 additions & 0 deletions lib/mobility-core/src/Kernel/Mock/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ instance CoreMetrics (MockM e) where
incrementErrorCounter _ _ = return ()
addUrlCallRetries _ _ = return ()
addUrlCallRetryFailures _ = return ()
incrementSortedSetCounter _ _ = return ()
incrementStreamCounter _ _ = return ()
addGenericLatency _ _ = return ()

instance MonadTime (MockM e) where
getCurrentTime = liftIO getCurrentTime
Expand Down
42 changes: 34 additions & 8 deletions lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import qualified Data.ByteString.Lazy as BSL
import Data.String.Conversions
import Data.Text hiding (concatMap, map, null)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as DE
import Database.Redis (Queued, Redis, RedisTx, Reply, TxResult (..))
import qualified Database.Redis as Hedis
import EulerHS.Prelude (whenLeft)
Expand Down Expand Up @@ -153,7 +152,7 @@ getImpl decodeResult key = withLogTag "Redis" $ do
Just res' -> decodeResult res'

get :: (FromJSON a, HedisFlow m env) => Text -> m (Maybe a)
get key = getImpl decodeResult key
get = getImpl decodeResult
where
decodeResult bs = Error.fromMaybeM (HedisDecodeError $ cs bs) $ Ae.decode $ BSL.fromStrict bs

Expand Down Expand Up @@ -415,7 +414,7 @@ zrevrangeWithscores key start stop = do
pure $ map (\(k, score) -> (cs' k, score)) res
where
cs' :: BS.ByteString -> Text
cs' = DE.decodeUtf8
cs' = cs

zScore :: (FromJSON Double, HedisFlow m env) => Text -> Text -> m (Maybe Double)
zScore key member = do
Expand All @@ -430,6 +429,23 @@ zRevRank key member = do
zCard :: (HedisFlow m env) => Text -> m Integer
zCard key = runWithPrefix key Hedis.zcard

zrangebyscore :: (HedisFlow m env) => Text -> Double -> Double -> m [BS.ByteString]
zrangebyscore key start end = do
preKey <- buildKey key
withLogTag "Redis" $ logDebug $ "working with key : " <> cs preKey
runHedis $ Hedis.zrangebyscore preKey start end

xadd :: (HedisFlow m env) => Text -> Text -> [(BS.ByteString, BS.ByteString)] -> m ()
xadd key entryId fieldValues = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
if migrating
then do
res <- withTimeRedis "RedisStandalone" "xadd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues)
whenLeft res (\err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_xadd" $ show err)
else pure ()
res <- withTimeRedis "RedisCluster" "xadd" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues)
whenLeft res (\err -> withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_xadd" $ show err)

zAdd ::
(ToJSON member, HedisFlow m env) =>
Text ->
Expand All @@ -450,7 +466,11 @@ xinfoGroups ::
Text -> -- Stream key
m Bool
xinfoGroups key = do
ls <- runWithPrefix key $ \prefKey -> Hedis.xinfoGroups prefKey
eitherMaybeBS <- withTimeRedis "RedisStandalone" "get" $ try @_ @SomeException (runWithPrefix key Hedis.xinfoGroups)
ls <-
case eitherMaybeBS of
Left err -> logTagInfo "ERROR_WHILE_GET" (show err) $> []
Right maybeBS -> pure maybeBS
return $ not (null ls)

-- Function to create a new consumer group for a stream
Expand All @@ -470,15 +490,21 @@ xreadGroup ::
[(Text, Text)] -> -- (stream, id) pairs
m [(Text, Text)]
xreadGroup groupName consumerName xs = do
let ls = map (\(stream, id) -> (DE.encodeUtf8 stream, DE.encodeUtf8 id)) xs
let ls = map (\(stream, id) -> (cs stream, cs id)) xs
let var = listToMaybe ls
logTagInfo "listToMaybe ls" (show var)
case var of
Just keyVal -> do
res <- runWithPrefix (DE.decodeUtf8 $ fst keyVal) $ \_ -> Hedis.xreadGroup (DE.encodeUtf8 groupName) (DE.encodeUtf8 consumerName) ls
eitherMaybeBS <- withTimeRedis "RedisStandalone" "get" $ try @_ @SomeException (runWithPrefix (cs $ fst keyVal) $ \_ -> Hedis.xreadGroup (cs groupName) (cs consumerName) ls)
res <-
case eitherMaybeBS of
Left err -> logTagInfo "ERROR_WHILE_GET" (show err) $> Nothing
Right maybeBS -> pure maybeBS
case res of
Just messages -> return $ extractKeyValuePairs (concatMap Hedis.records messages)
Just messages -> do
return $ extractKeyValuePairs (concatMap Hedis.records messages)
Nothing -> pure []
Nothing -> pure []

extractKeyValuePairs :: [Hedis.StreamsRecord] -> [(Text, Text)]
extractKeyValuePairs = concatMap (\(Hedis.StreamsRecord _ keyVals) -> map (\(k, v) -> (DE.decodeUtf8 k, DE.decodeUtf8 v)) keyVals)
extractKeyValuePairs = concatMap (\(Hedis.StreamsRecord _ keyVals) -> map (\(k, v) -> (cs k, cs v)) keyVals)
61 changes: 61 additions & 0 deletions lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,64 @@ addUrlCallFailuresImplementation' cmContainers url version = do
urlCallRetriesMetric
(showBaseUrlText url, version.getDeploymentVersion)
P.incCounter

incrementSortedSetCounterImplementation ::
( HasCoreMetrics r,
L.MonadFlow m,
MonadReader r m
) =>
Text ->
Int ->
m ()
incrementSortedSetCounterImplementation context scheduledSecond = do
cmContainer <- asks (.coreMetrics)
version <- asks (.version)
incrementSortedSetCounterImplementation' cmContainer context scheduledSecond version

incrementSortedSetCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> Int -> DeploymentVersion -> m ()
incrementSortedSetCounterImplementation' cmContainers context scheduledSecond version = do
let sortedSetMetric = cmContainers.sortedSetCounter
L.runIO $
P.withLabel
sortedSetMetric
(context, show scheduledSecond, version.getDeploymentVersion)
P.incCounter

incrementStreamCounterImplementation ::
( HasCoreMetrics r,
L.MonadFlow m,
MonadReader r m
) =>
Text ->
Int ->
m ()
incrementStreamCounterImplementation context executedseconds = do
cmContainer <- asks (.coreMetrics)
version <- asks (.version)
incrementStreamCounterImplementation' cmContainer context executedseconds version

incrementStreamCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> Int -> DeploymentVersion -> m ()
incrementStreamCounterImplementation' cmContainers context executedseconds version = do
let sortedSetMetric = cmContainers.sortedSetCounter
L.runIO $
P.withLabel
sortedSetMetric
(context, show executedseconds, version.getDeploymentVersion)
P.incCounter

addGenericLatencyImplementation ::
( HasCoreMetrics r,
L.MonadFlow m,
MonadReader r m
) =>
Text ->
NominalDiffTime ->
m ()
addGenericLatencyImplementation operation latency = do
cmContainer <- asks (.coreMetrics)
version <- asks (.version)
L.runIO $
P.withLabel
cmContainer.genericLatency
(operation, version.getDeploymentVersion)
(`P.observe` (fromIntegral $ div (fromEnum . nominalDiffTimeToSeconds $ latency) 1000000000000))
48 changes: 41 additions & 7 deletions lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ type URLCallRetriesMetric = P.Vector P.Label3 P.Counter

type URLCallRetryFailuresMetric = P.Vector P.Label2 P.Counter

type SortedSetMetric = P.Vector P.Label3 P.Counter

type StreamMetric = P.Vector P.Label3 P.Counter

type GenericLatencyMetric = P.Vector P.Label2 P.Histogram

type HasCoreMetrics r =
( HasField "coreMetrics" r CoreMetricsContainer,
HasField "version" r DeploymentVersion
Expand All @@ -46,32 +52,36 @@ type HasCoreMetrics r =
newtype DeploymentVersion = DeploymentVersion {getDeploymentVersion :: Text}

class CoreMetrics m where
addRequestLatency ::
Text ->
Text ->
Milliseconds ->
Either ClientError a ->
m ()
addRequestLatency :: Text -> Text -> Milliseconds -> Either ClientError a -> m ()
addDatastoreLatency :: Text -> Text -> NominalDiffTime -> m ()
incrementErrorCounter :: Text -> SomeException -> m ()
addUrlCallRetries :: BaseUrl -> Int -> m ()
addUrlCallRetryFailures :: BaseUrl -> m ()
incrementSortedSetCounter :: Text -> Int -> m ()
incrementStreamCounter :: Text -> Int -> m ()
addGenericLatency :: Text -> NominalDiffTime -> m ()

data CoreMetricsContainer = CoreMetricsContainer
{ requestLatency :: RequestLatencyMetric,
datastoresLatency :: DatastoresLatencyMetric,
genericLatency :: GenericLatencyMetric,
errorCounter :: ErrorCounterMetric,
urlCallRetries :: URLCallRetriesMetric,
urlCallRetryFailures :: URLCallRetryFailuresMetric
urlCallRetryFailures :: URLCallRetryFailuresMetric,
sortedSetCounter :: SortedSetMetric,
streamCounter :: StreamMetric
}

registerCoreMetricsContainer :: IO CoreMetricsContainer
registerCoreMetricsContainer = do
requestLatency <- registerRequestLatencyMetric
datastoresLatency <- registerDatastoresLatencyMetrics
genericLatency <- registerGenericLatencyMetrics
errorCounter <- registerErrorCounterMetric
urlCallRetries <- registerURLCallRetriesMetric
urlCallRetryFailures <- registerURLCallRetryFailuresMetric
sortedSetCounter <- registerSortedSetMetric
streamCounter <- registerStreamCounter

return CoreMetricsContainer {..}

Expand Down Expand Up @@ -114,3 +124,27 @@ registerURLCallRetryFailuresMetric =
P.counter info
where
info = P.Info "url_call_retry_failures_counter" ""

registerSortedSetMetric :: IO SortedSetMetric
registerSortedSetMetric =
P.register $
P.vector ("job_type", "scheduled_second", "version") $
P.counter info
where
info = P.Info "sortedset_scheduled_jobs_counter" ""

registerStreamCounter :: IO StreamMetric
registerStreamCounter =
P.register $
P.vector ("job_type", "executed_seconds", "version") $
P.counter info
where
info = P.Info "stream_jobs_counter" ""

registerGenericLatencyMetrics :: IO GenericLatencyMetric
registerGenericLatencyMetrics =
P.register $
P.vector ("operation", "version") $
P.histogram info P.defaultBuckets
where
info = P.Info "producer_operation_duration" ""
3 changes: 3 additions & 0 deletions lib/mobility-core/src/Kernel/Types/Flow.hs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ instance Metrics.HasCoreMetrics r => Metrics.CoreMetrics (FlowR r) where
incrementErrorCounter = Metrics.incrementErrorCounterImplementation
addUrlCallRetries = Metrics.addUrlCallRetriesImplementation
addUrlCallRetryFailures = Metrics.addUrlCallFailuresImplementation
incrementSortedSetCounter = Metrics.incrementSortedSetCounterImplementation
incrementStreamCounter = Metrics.incrementStreamCounterImplementation
addGenericLatency = Metrics.addGenericLatencyImplementation

instance MonadMonitor (FlowR r) where
doIO = liftIO
Expand Down
3 changes: 3 additions & 0 deletions lib/mobility-core/test/src/APIExceptions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ instance Metrics.CoreMetrics IO where
incrementErrorCounter _ _ = return ()
addUrlCallRetries _ _ = return ()
addUrlCallRetryFailures _ = return ()
incrementSortedSetCounter _ _ = return ()
incrementStreamCounter _ _ = return ()
addGenericLatency _ _ = return ()

httpExceptionTests :: TestTree
httpExceptionTests =
Expand Down

0 comments on commit b05aa40

Please sign in to comment.