Skip to content

Commit

Permalink
Rename a few concurrent APIs and fix documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyaov committed Feb 5, 2025
1 parent c1b69b9 commit 023b4ea
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 109 deletions.
4 changes: 2 additions & 2 deletions benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ o_n_heap_buffering :: Int -> (Config -> Config) -> [Benchmark]
o_n_heap_buffering value f =
[ bgroup "buffered"
[ benchIOSink value "mkAsync"
(Stream.fold Fold.drain . Async.parEval f)
(Stream.fold Fold.drain . Async.parBuffered f)
]
]

Expand Down Expand Up @@ -174,7 +174,7 @@ o_1_space_concatMap label value f =
toNullAp :: (Config -> Config) -> Int -> Int -> IO ()
toNullAp f linearCount start =
Stream.fold Fold.drain
$ Async.parApply f
$ Async.parCrossApply f
(fmap (+) (sourceUnfoldrM nestedCount2 start))
(sourceUnfoldrM nestedCount2 start)

Expand Down
40 changes: 20 additions & 20 deletions benchmark/Streamly/Benchmark/Data/Stream/Rate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ avgRate :: MonadAsync m =>
avgRate f cfg value rt = f (Stream.avgRate rt . cfg) . sourceUnfoldrM value

{-
-- parEval should be maxThreads 1 anyway
-- parBuffered should be maxThreads 1 anyway
{-# INLINE avgRateThreads1 #-}
avgRateThreads1 :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
Expand Down Expand Up @@ -68,50 +68,50 @@ constRate f cfg value rt = f (Stream.constRate rt . cfg) . sourceUnfoldrM value
o_1_space_async :: Int -> [Benchmark]
o_1_space_async value =
[ bgroup
"default/parEval"
"default/parBuffered"
[ bgroup
"avgRate"
-- benchIO "unfoldr" $ toNull
[ benchIOSrc "baseline" (Stream.parEval id . sourceUnfoldrM value)
, benchIOSrc "Nothing" $ rateNothing Stream.parEval id value
, benchIOSrc "1M" $ avgRate Stream.parEval id value 1000000
, benchIOSrc "3M" $ avgRate Stream.parEval id value 3000000
-- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parEval value 10000000
, benchIOSrc "10M" $ avgRate Stream.parEval id value 10000000
, benchIOSrc "20M" $ avgRate Stream.parEval id value 20000000
[ benchIOSrc "baseline" (Stream.parBuffered id . sourceUnfoldrM value)
, benchIOSrc "Nothing" $ rateNothing Stream.parBuffered id value
, benchIOSrc "1M" $ avgRate Stream.parBuffered id value 1000000
, benchIOSrc "3M" $ avgRate Stream.parBuffered id value 3000000
-- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parBuffered value 10000000
, benchIOSrc "10M" $ avgRate Stream.parBuffered id value 10000000
, benchIOSrc "20M" $ avgRate Stream.parBuffered id value 20000000
]
, bgroup
"minRate"
[ benchIOSrc "1M" $ minRate Stream.parEval id value 1000000
, benchIOSrc "10M" $ minRate Stream.parEval id value 10000000
, benchIOSrc "20M" $ minRate Stream.parEval id value 20000000
[ benchIOSrc "1M" $ minRate Stream.parBuffered id value 1000000
, benchIOSrc "10M" $ minRate Stream.parBuffered id value 10000000
, benchIOSrc "20M" $ minRate Stream.parBuffered id value 20000000
]
, bgroup
"maxRate"
[ -- benchIOSrc "10K" $ maxRate value 10000
benchIOSrc "10M" $ maxRate Stream.parEval id value 10000000
benchIOSrc "10M" $ maxRate Stream.parBuffered id value 10000000
]
, bgroup
"constRate"
[ -- benchIOSrc "10K" $ constRate value 10000
benchIOSrc "1M" $ constRate Stream.parEval id value 1000000
, benchIOSrc "10M" $ constRate Stream.parEval id value 10000000
benchIOSrc "1M" $ constRate Stream.parBuffered id value 1000000
, benchIOSrc "10M" $ constRate Stream.parBuffered id value 10000000
]
]
]

o_1_space_ahead :: Int -> [Benchmark]
o_1_space_ahead value =
[ bgroup
"ordered/parEval"
"ordered/parBuffered"
[ benchIOSrc "avgRate/1M"
$ avgRate Stream.parEval (Stream.ordered True) value 1000000
$ avgRate Stream.parBuffered (Stream.ordered True) value 1000000
, benchIOSrc "minRate/1M"
$ minRate Stream.parEval (Stream.ordered True) value 1000000
$ minRate Stream.parBuffered (Stream.ordered True) value 1000000
, benchIOSrc "maxRate/1M"
$ maxRate Stream.parEval (Stream.ordered True) value 1000000
$ maxRate Stream.parBuffered (Stream.ordered True) value 1000000
, benchIOSrc "constRate/1M"
$ constRate Stream.parEval (Stream.ordered True) value 1000000
$ constRate Stream.parBuffered (Stream.ordered True) value 1000000
]
]

Expand Down
8 changes: 4 additions & 4 deletions docs/User/HowTo/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ f1 x =
& Stream.fold Fold.toList -- Fold to list
```

Use `parApply` to zip streams concurrently. Here, we zip three singleton
Use `parZipWith` to zip streams concurrently. Here, we zip three singleton
streams:

```haskell ghci

f2 x =
let app = Stream.parApply id
let app = Stream.parZipWith id ($)
in (,,)
`fmap` Stream.fromEffect (return $ show x)
`app` Stream.fromEffect (return $ x + 1)
Expand Down Expand Up @@ -61,7 +61,7 @@ transformations using the distribute/zip operations.
[Just ("1",2,0.5),Just ("2",3,1.0),Just ("3",4,1.5),Just ("4",5,2.0)]
```

Instead of using `parApply` directly, you can use `mkZipType` to
Instead of using `parZipWith` directly, you can use `mkZipType` to
create a zip Applicative newtype so that you can use the `Applicative`
instance.

Expand All @@ -73,7 +73,7 @@ instance.

import Streamly.Internal.Data.Stream.TypeGen

app = parApply id
app = parZipWith id ($)
$(mkZippingType "ZipConcurrent" "app" True)
```

Expand Down
4 changes: 2 additions & 2 deletions src/Streamly/Data/Stream/MkType.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
-- For 'Streamly.Prelude.ZipAsync' concurrent zipping applicative type:
--
-- >>> :{
-- parApply = Stream.parApply id
-- $(mkZipType "ZipAsync" "parApply" True)
-- parCrossApply = Stream.parCrossApply id
-- $(mkZipType "ZipAsync" "parCrossApply" True)
-- :}
--
-- Instead of using these macros directly you could use the generated code as
Expand Down
19 changes: 11 additions & 8 deletions src/Streamly/Data/Stream/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ module Streamly.Data.Stream.Prelude
-- | Evaluate a stream as a whole concurrently with respect to the consumer
-- of the stream.

, parEval
, parBuffered

-- *** Generate
-- | Generate a stream by evaluating multiple actions concurrently.
, parRepeatM
, parReplicateM
, fromCallback
, parSetCallback

-- *** Map
-- | Map actions on a stream such that the mapped actions are evaluated
Expand Down Expand Up @@ -98,7 +98,7 @@ module Streamly.Data.Stream.Prelude
-- *** Stream of streams
-- **** Apply

, parApply
, parCrossApply

-- **** Concat
-- | Shares a single channel across many streams.
Expand Down Expand Up @@ -138,6 +138,9 @@ module Streamly.Data.Stream.Prelude

-- ** Deprecated
, tapCount
, parEval
, fromCallback
, parApply
)
where

Expand All @@ -156,15 +159,15 @@ import Streamly.Internal.Data.Stream.Prelude
--
-- == Primitives
--
-- There are only a few fundamental abstractions for concurrency, 'parEval',
-- There are only a few fundamental abstractions for concurrency, 'parBuffered',
-- 'parConcatMap', and 'parConcatIterate', all concurrency combinators can be
-- expressed in terms of these.
--
-- 'parEval' evaluates a stream as a whole asynchronously with respect to
-- 'parBuffered' evaluates a stream as a whole asynchronously with respect to
-- the consumer of the stream. A worker thread evaluates multiple elements of
-- the stream ahead of time and buffers the results; the consumer of the stream
-- runs in another thread consuming the elements from the buffer, thus
-- decoupling the production and consumption of the stream. 'parEval' can be
-- decoupling the production and consumption of the stream. 'parBuffered' can be
-- used to run different stages of a pipeline concurrently.
--
-- 'parConcatMap' is used to evaluate multiple actions in a stream concurrently
Expand Down Expand Up @@ -217,8 +220,8 @@ import Streamly.Internal.Data.Stream.Prelude
-- Using the few fundamental concurrency primitives we can implement all the
-- usual streaming combinators with concurrent behavior. Combinators like
-- 'unfoldrM', 'iterateM' that are inherently serial can be evaluated
-- concurrently with respect to the consumer pipeline using 'parEval'.
-- Combinators like 'zipWithM', 'mergeByM' can also use 'parEval' on the input
-- concurrently with respect to the consumer pipeline using 'parBuffered'.
-- Combinators like 'zipWithM', 'mergeByM' can also use 'parBuffered' on the input
-- streams to evaluate them concurrently before combining.
--
-- Combinators like 'repeatM', 'replicateM', 'fromListM', 'sequence', 'mapM' in
Expand Down
50 changes: 28 additions & 22 deletions src/Streamly/Internal/Data/Fold/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@
--
-- = Asynchronous Evaluation
--
-- Using 'parEval' a fold can be decoupled from the driver and evaluated
-- Using 'parBuffered' a fold can be decoupled from the driver and evaluated
-- concurrently with the driver. The driver just pushes an element to the
-- fold's buffer and waits for async evaluation to finish.
--
-- Stages in a fold pipeline can be made concurrent using 'parEval'.
-- Stages in a fold pipeline can be made concurrent using 'parBuffered'.
--
-- = Concurrent Fold Combinators
--
-- The 'demux' combinator can be made concurrent by using 'parEval' on the fold
-- The 'demux' combinator can be made concurrent by using 'parBuffered' on the fold
-- returned by the fold-generating function. Thus, we can fold values for each
-- key in the input stream concurrently.
--
-- Similarly, we can use 'parEval' with other cobminators like 'toMap',
-- Similarly, we can use 'parBuffered' with other cobminators like 'toMap',
-- 'demuxToMap', 'classify', 'tee', 'distribute', 'partition' etc. Basically,
-- any combinator that composes multiple folds or multiple instances of a fold
-- is a good candidate for running folds concurrently.
--
-- = Finalization
--
-- Before a fold returns "done" it has to drain the child folds. For example,
-- consider a "take" operation on a `parEval` fold, the take should return as
-- consider a "take" operation on a `parBuffered` fold, the take should return as
-- soon as it has taken required number of elements but we have to ensure that
-- any asynchronous child folds finish before it returns. This is achieved by
-- calling the "final" operation of the fold.
Expand All @@ -47,18 +47,22 @@

module Streamly.Internal.Data.Fold.Concurrent
(
parEval
parBuffered
, parLmapM
, parTeeWith
, parDistribute
, parPartition
, parUnzipWithM
, parDistributeScan
, parDemuxScan

-- Deprecated
, parEval
)
where

#include "inline.hs"
#include "deprecation.h"

import Control.Concurrent (newEmptyMVar, takeMVar, throwTo)
import Control.Monad.Catch (throwM)
Expand Down Expand Up @@ -90,9 +94,9 @@ import Streamly.Internal.Data.Channel.Types
-- Evaluating a Fold
-------------------------------------------------------------------------------

-- | 'parEval' introduces a concurrent stage at the input of the fold. The
-- | 'parBuffered' introduces a concurrent stage at the input of the fold. The
-- inputs are asynchronously queued in a buffer and evaluated concurrently with
-- the evaluation of the source stream. On finalization, 'parEval' waits for
-- the evaluation of the source stream. On finalization, 'parBuffered' waits for
-- the asynchronous fold to complete before it returns.
--
-- In the following example both the stream and the fold have a 1 second delay,
Expand All @@ -101,7 +105,7 @@ import Streamly.Internal.Data.Channel.Types
-- >>> delay x = threadDelay 1000000 >> print x >> return x
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> dst = Fold.parEval id (Fold.lmapM delay Fold.sum)
-- >>> dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
-- >>> Stream.fold dst src
-- ...
--
Expand All @@ -110,9 +114,10 @@ import Streamly.Internal.Data.Channel.Types
-- >>> Stream.toList $ Stream.groupsOf 4 dst src
-- ...
--
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval modifier f =
{-# INLINABLE parBuffered #-}
parBuffered, parEval
:: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parBuffered modifier f =
Fold step initial extract final

where
Expand Down Expand Up @@ -141,7 +146,7 @@ parEval modifier f =
-- events and tick events, latter are guaranteed to arrive.
--
-- XXX We can use the config to indicate if the fold is a scanning type or
-- one-shot, or use a separate parEvalScan for scanning. For a scanning
-- one-shot, or use a separate parBufferedScan for scanning. For a scanning
-- type fold the worker would always send the intermediate values back to
-- the driver. An intermediate value can be returned on an input, or the
-- driver can poll even without input, if we have the Skip input support.
Expand Down Expand Up @@ -173,13 +178,14 @@ parEval modifier f =
$ withDiagMVar
(svarInspectMode chan)
(dumpChannel chan)
"parEval: waiting to drain"
"parBuffered: waiting to drain"
$ takeMVar (outputDoorBell chan)
-- XXX remove recursion
final chan
Just b -> do
cleanup chan
return b
RENAME(parEval,parBuffered)

-- XXX We can have a lconcatMap (unfoldMany) to expand the chunks in the input
-- to streams before folding. This will require an input Skip constructor. In
Expand All @@ -198,7 +204,7 @@ parLmapM = undefined
--
-- Definition:
--
-- >>> parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parEval cfg c1) (Fold.parEval cfg c2)
-- >>> parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
--
-- Example:
--
Expand All @@ -216,13 +222,13 @@ parTeeWith :: MonadAsync m =>
-> Fold m x a
-> Fold m x b
-> Fold m x c
parTeeWith cfg f c1 c2 = Fold.teeWith f (parEval cfg c1) (parEval cfg c2)
parTeeWith cfg f c1 c2 = Fold.teeWith f (parBuffered cfg c1) (parBuffered cfg c2)

-- | Distribute the input to all the folds in the supplied list concurrently.
--
-- Definition:
--
-- >>> parDistribute cfg = Fold.distribute . fmap (Fold.parEval cfg)
-- >>> parDistribute cfg = Fold.distribute . fmap (Fold.parBuffered cfg)
--
-- Example:
--
Expand All @@ -235,14 +241,14 @@ parTeeWith cfg f c1 c2 = Fold.teeWith f (parEval cfg c1) (parEval cfg c2)
{-# INLINABLE parDistribute #-}
parDistribute :: MonadAsync m =>
(Config -> Config) -> [Fold m a b] -> Fold m a [b]
parDistribute cfg = Fold.distribute . fmap (parEval cfg)
parDistribute cfg = Fold.distribute . fmap (parBuffered cfg)

-- | Select first fold for Left input and second for Right input. Both folds
-- run concurrently.
--
-- Definition
--
-- >>> parPartition cfg c1 c2 = Fold.partition (Fold.parEval cfg c1) (Fold.parEval cfg c2)
-- >>> parPartition cfg c1 c2 = Fold.partition (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
--
-- Example:
--
Expand All @@ -256,14 +262,14 @@ parDistribute cfg = Fold.distribute . fmap (parEval cfg)
{-# INLINABLE parPartition #-}
parPartition :: MonadAsync m =>
(Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
parPartition cfg c1 c2 = Fold.partition (parEval cfg c1) (parEval cfg c2)
parPartition cfg c1 c2 = Fold.partition (parBuffered cfg c1) (parBuffered cfg c2)

-- | Split and distribute the output to two different folds and then zip the
-- results. Both the consumer folds run concurrently.
--
-- Definition
--
-- >>> parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parEval cfg c1) (Fold.parEval cfg c2)
-- >>> parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)
--
-- Example:
--
Expand All @@ -277,7 +283,7 @@ parPartition cfg c1 c2 = Fold.partition (parEval cfg c1) (parEval cfg c2)
{-# INLINABLE parUnzipWithM #-}
parUnzipWithM :: MonadAsync m
=> (Config -> Config) -> (a -> m (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y)
parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (parEval cfg c1) (parEval cfg c2)
parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (parBuffered cfg c1) (parBuffered cfg c2)

-- There are two ways to implement a concurrent scan.
--
Expand Down
Loading

0 comments on commit 023b4ea

Please sign in to comment.