From 77202bd7cc68f3c801bf9579f78e8a61428fa6ae Mon Sep 17 00:00:00 2001 From: Adithya Kumar Date: Wed, 5 Feb 2025 22:29:49 +0530 Subject: [PATCH] Rename a few concurrent APIs and fix documentation --- .../Benchmark/Data/Stream/ConcurrentCommon.hs | 4 +- .../Streamly/Benchmark/Data/Stream/Rate.hs | 40 ++++----- docs/User/HowTo/faq.md | 8 +- src/Streamly/Data/Stream/MkType.hs | 4 +- src/Streamly/Data/Stream/Prelude.hs | 19 ++-- src/Streamly/Internal/Data/Fold/Concurrent.hs | 50 ++++++----- .../Internal/Data/Stream/Concurrent.hs | 90 +++++++++++-------- src/Streamly/Internal/Data/Stream/Time.hs | 6 +- test/Streamly/Test/Data/Stream/Concurrent.hs | 20 ++--- 9 files changed, 132 insertions(+), 109 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs b/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs index 100d215bea..f48c2f2e2d 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs @@ -56,7 +56,7 @@ o_n_heap_buffering :: Int -> (Config -> Config) -> [Benchmark] o_n_heap_buffering value f = [ bgroup "buffered" [ benchIOSink value "mkAsync" - (Stream.fold Fold.drain . Async.parEval f) + (Stream.fold Fold.drain . Async.parBuffered f) ] ] @@ -174,7 +174,7 @@ o_1_space_concatMap label value f = toNullAp :: (Config -> Config) -> Int -> Int -> IO () toNullAp f linearCount start = Stream.fold Fold.drain - $ Async.parApply f + $ Async.parCrossApply f (fmap (+) (sourceUnfoldrM nestedCount2 start)) (sourceUnfoldrM nestedCount2 start) diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs b/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs index 137ad8e9e7..1f96266ae5 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs @@ -36,7 +36,7 @@ avgRate :: MonadAsync m => avgRate f cfg value rt = f (Stream.avgRate rt . cfg) . sourceUnfoldrM value {- --- parEval should be maxThreads 1 anyway +-- parBuffered should be maxThreads 1 anyway {-# INLINE avgRateThreads1 #-} avgRateThreads1 :: MonadAsync m => ((Config -> Config) -> Stream m Int -> Stream m Int) @@ -68,34 +68,34 @@ constRate f cfg value rt = f (Stream.constRate rt . cfg) . sourceUnfoldrM value o_1_space_async :: Int -> [Benchmark] o_1_space_async value = [ bgroup - "default/parEval" + "default/parBuffered" [ bgroup "avgRate" -- benchIO "unfoldr" $ toNull - [ benchIOSrc "baseline" (Stream.parEval id . sourceUnfoldrM value) - , benchIOSrc "Nothing" $ rateNothing Stream.parEval id value - , benchIOSrc "1M" $ avgRate Stream.parEval id value 1000000 - , benchIOSrc "3M" $ avgRate Stream.parEval id value 3000000 - -- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parEval value 10000000 - , benchIOSrc "10M" $ avgRate Stream.parEval id value 10000000 - , benchIOSrc "20M" $ avgRate Stream.parEval id value 20000000 + [ benchIOSrc "baseline" (Stream.parBuffered id . sourceUnfoldrM value) + , benchIOSrc "Nothing" $ rateNothing Stream.parBuffered id value + , benchIOSrc "1M" $ avgRate Stream.parBuffered id value 1000000 + , benchIOSrc "3M" $ avgRate Stream.parBuffered id value 3000000 + -- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parBuffered value 10000000 + , benchIOSrc "10M" $ avgRate Stream.parBuffered id value 10000000 + , benchIOSrc "20M" $ avgRate Stream.parBuffered id value 20000000 ] , bgroup "minRate" - [ benchIOSrc "1M" $ minRate Stream.parEval id value 1000000 - , benchIOSrc "10M" $ minRate Stream.parEval id value 10000000 - , benchIOSrc "20M" $ minRate Stream.parEval id value 20000000 + [ benchIOSrc "1M" $ minRate Stream.parBuffered id value 1000000 + , benchIOSrc "10M" $ minRate Stream.parBuffered id value 10000000 + , benchIOSrc "20M" $ minRate Stream.parBuffered id value 20000000 ] , bgroup "maxRate" [ -- benchIOSrc "10K" $ maxRate value 10000 - benchIOSrc "10M" $ maxRate Stream.parEval id value 10000000 + benchIOSrc "10M" $ maxRate Stream.parBuffered id value 10000000 ] , bgroup "constRate" [ -- benchIOSrc "10K" $ constRate value 10000 - benchIOSrc "1M" $ constRate Stream.parEval id value 1000000 - , benchIOSrc "10M" $ constRate Stream.parEval id value 10000000 + benchIOSrc "1M" $ constRate Stream.parBuffered id value 1000000 + , benchIOSrc "10M" $ constRate Stream.parBuffered id value 10000000 ] ] ] @@ -103,15 +103,15 @@ o_1_space_async value = o_1_space_ahead :: Int -> [Benchmark] o_1_space_ahead value = [ bgroup - "ordered/parEval" + "ordered/parBuffered" [ benchIOSrc "avgRate/1M" - $ avgRate Stream.parEval (Stream.ordered True) value 1000000 + $ avgRate Stream.parBuffered (Stream.ordered True) value 1000000 , benchIOSrc "minRate/1M" - $ minRate Stream.parEval (Stream.ordered True) value 1000000 + $ minRate Stream.parBuffered (Stream.ordered True) value 1000000 , benchIOSrc "maxRate/1M" - $ maxRate Stream.parEval (Stream.ordered True) value 1000000 + $ maxRate Stream.parBuffered (Stream.ordered True) value 1000000 , benchIOSrc "constRate/1M" - $ constRate Stream.parEval (Stream.ordered True) value 1000000 + $ constRate Stream.parBuffered (Stream.ordered True) value 1000000 ] ] diff --git a/docs/User/HowTo/faq.md b/docs/User/HowTo/faq.md index b891c14c4a..eaf968e6ec 100644 --- a/docs/User/HowTo/faq.md +++ b/docs/User/HowTo/faq.md @@ -27,13 +27,13 @@ f1 x = & Stream.fold Fold.toList -- Fold to list ``` -Use `parApply` to zip streams concurrently. Here, we zip three singleton +Use `parZipWith` to zip streams concurrently. Here, we zip three singleton streams: ```haskell ghci f2 x = - let app = Stream.parApply id + let app = Stream.parZipWith id ($) in (,,) `fmap` Stream.fromEffect (return $ show x) `app` Stream.fromEffect (return $ x + 1) @@ -61,7 +61,7 @@ transformations using the distribute/zip operations. [Just ("1",2,0.5),Just ("2",3,1.0),Just ("3",4,1.5),Just ("4",5,2.0)] ``` -Instead of using `parApply` directly, you can use `mkZipType` to +Instead of using `parZipWith` directly, you can use `mkZipType` to create a zip Applicative newtype so that you can use the `Applicative` instance. @@ -73,7 +73,7 @@ instance. import Streamly.Internal.Data.Stream.TypeGen -app = parApply id +app = parZipWith id ($) $(mkZippingType "ZipConcurrent" "app" True) ``` diff --git a/src/Streamly/Data/Stream/MkType.hs b/src/Streamly/Data/Stream/MkType.hs index cf7aaeaa69..7b22a5f7c2 100644 --- a/src/Streamly/Data/Stream/MkType.hs +++ b/src/Streamly/Data/Stream/MkType.hs @@ -66,8 +66,8 @@ -- For 'Streamly.Prelude.ZipAsync' concurrent zipping applicative type: -- -- >>> :{ --- parApply = Stream.parApply id --- $(mkZipType "ZipAsync" "parApply" True) +-- parCrossApply = Stream.parCrossApply id +-- $(mkZipType "ZipAsync" "parCrossApply" True) -- :} -- -- Instead of using these macros directly you could use the generated code as diff --git a/src/Streamly/Data/Stream/Prelude.hs b/src/Streamly/Data/Stream/Prelude.hs index d516fdb583..436d79a769 100644 --- a/src/Streamly/Data/Stream/Prelude.hs +++ b/src/Streamly/Data/Stream/Prelude.hs @@ -64,13 +64,13 @@ module Streamly.Data.Stream.Prelude -- | Evaluate a stream as a whole concurrently with respect to the consumer -- of the stream. - , parEval + , parBuffered -- *** Generate -- | Generate a stream by evaluating multiple actions concurrently. , parRepeatM , parReplicateM - , fromCallback + , parSetCallback -- *** Map -- | Map actions on a stream such that the mapped actions are evaluated @@ -98,7 +98,7 @@ module Streamly.Data.Stream.Prelude -- *** Stream of streams -- **** Apply - , parApply + , parCrossApply -- **** Concat -- | Shares a single channel across many streams. @@ -138,6 +138,9 @@ module Streamly.Data.Stream.Prelude -- ** Deprecated , tapCount + , parEval + , fromCallback + , parApply ) where @@ -156,15 +159,15 @@ import Streamly.Internal.Data.Stream.Prelude -- -- == Primitives -- --- There are only a few fundamental abstractions for concurrency, 'parEval', +-- There are only a few fundamental abstractions for concurrency, 'parBuffered', -- 'parConcatMap', and 'parConcatIterate', all concurrency combinators can be -- expressed in terms of these. -- --- 'parEval' evaluates a stream as a whole asynchronously with respect to +-- 'parBuffered' evaluates a stream as a whole asynchronously with respect to -- the consumer of the stream. A worker thread evaluates multiple elements of -- the stream ahead of time and buffers the results; the consumer of the stream -- runs in another thread consuming the elements from the buffer, thus --- decoupling the production and consumption of the stream. 'parEval' can be +-- decoupling the production and consumption of the stream. 'parBuffered' can be -- used to run different stages of a pipeline concurrently. -- -- 'parConcatMap' is used to evaluate multiple actions in a stream concurrently @@ -217,8 +220,8 @@ import Streamly.Internal.Data.Stream.Prelude -- Using the few fundamental concurrency primitives we can implement all the -- usual streaming combinators with concurrent behavior. Combinators like -- 'unfoldrM', 'iterateM' that are inherently serial can be evaluated --- concurrently with respect to the consumer pipeline using 'parEval'. --- Combinators like 'zipWithM', 'mergeByM' can also use 'parEval' on the input +-- concurrently with respect to the consumer pipeline using 'parBuffered'. +-- Combinators like 'zipWithM', 'mergeByM' can also use 'parBuffered' on the input -- streams to evaluate them concurrently before combining. -- -- Combinators like 'repeatM', 'replicateM', 'fromListM', 'sequence', 'mapM' in diff --git a/src/Streamly/Internal/Data/Fold/Concurrent.hs b/src/Streamly/Internal/Data/Fold/Concurrent.hs index 1cdf531746..e049247db7 100644 --- a/src/Streamly/Internal/Data/Fold/Concurrent.hs +++ b/src/Streamly/Internal/Data/Fold/Concurrent.hs @@ -8,19 +8,19 @@ -- -- = Asynchronous Evaluation -- --- Using 'parEval' a fold can be decoupled from the driver and evaluated +-- Using 'parBuffered' a fold can be decoupled from the driver and evaluated -- concurrently with the driver. The driver just pushes an element to the -- fold's buffer and waits for async evaluation to finish. -- --- Stages in a fold pipeline can be made concurrent using 'parEval'. +-- Stages in a fold pipeline can be made concurrent using 'parBuffered'. -- -- = Concurrent Fold Combinators -- --- The 'demux' combinator can be made concurrent by using 'parEval' on the fold +-- The 'demux' combinator can be made concurrent by using 'parBuffered' on the fold -- returned by the fold-generating function. Thus, we can fold values for each -- key in the input stream concurrently. -- --- Similarly, we can use 'parEval' with other cobminators like 'toMap', +-- Similarly, we can use 'parBuffered' with other cobminators like 'toMap', -- 'demuxToMap', 'classify', 'tee', 'distribute', 'partition' etc. Basically, -- any combinator that composes multiple folds or multiple instances of a fold -- is a good candidate for running folds concurrently. @@ -28,7 +28,7 @@ -- = Finalization -- -- Before a fold returns "done" it has to drain the child folds. For example, --- consider a "take" operation on a `parEval` fold, the take should return as +-- consider a "take" operation on a `parBuffered` fold, the take should return as -- soon as it has taken required number of elements but we have to ensure that -- any asynchronous child folds finish before it returns. This is achieved by -- calling the "final" operation of the fold. @@ -47,7 +47,7 @@ module Streamly.Internal.Data.Fold.Concurrent ( - parEval + parBuffered , parLmapM , parTeeWith , parDistribute @@ -55,10 +55,14 @@ module Streamly.Internal.Data.Fold.Concurrent , parUnzipWithM , parDistributeScan , parDemuxScan + + -- Deprecated + , parEval ) where #include "inline.hs" +#include "deprecation.hs" import Control.Concurrent (newEmptyMVar, takeMVar, throwTo) import Control.Monad.Catch (throwM) @@ -90,9 +94,9 @@ import Streamly.Internal.Data.Channel.Types -- Evaluating a Fold ------------------------------------------------------------------------------- --- | 'parEval' introduces a concurrent stage at the input of the fold. The +-- | 'parBuffered' introduces a concurrent stage at the input of the fold. The -- inputs are asynchronously queued in a buffer and evaluated concurrently with --- the evaluation of the source stream. On finalization, 'parEval' waits for +-- the evaluation of the source stream. On finalization, 'parBuffered' waits for -- the asynchronous fold to complete before it returns. -- -- In the following example both the stream and the fold have a 1 second delay, @@ -101,7 +105,7 @@ import Streamly.Internal.Data.Channel.Types -- >>> delay x = threadDelay 1000000 >> print x >> return x -- -- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3) --- >>> dst = Fold.parEval id (Fold.lmapM delay Fold.sum) +-- >>> dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum) -- >>> Stream.fold dst src -- ... -- @@ -110,9 +114,10 @@ import Streamly.Internal.Data.Channel.Types -- >>> Stream.toList $ Stream.groupsOf 4 dst src -- ... -- -{-# INLINABLE parEval #-} -parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b -parEval modifier f = +{-# INLINABLE parBuffered #-} +parBuffered, parEval + :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b +parBuffered modifier f = Fold step initial extract final where @@ -141,7 +146,7 @@ parEval modifier f = -- events and tick events, latter are guaranteed to arrive. -- -- XXX We can use the config to indicate if the fold is a scanning type or - -- one-shot, or use a separate parEvalScan for scanning. For a scanning + -- one-shot, or use a separate parBufferedScan for scanning. For a scanning -- type fold the worker would always send the intermediate values back to -- the driver. An intermediate value can be returned on an input, or the -- driver can poll even without input, if we have the Skip input support. @@ -173,13 +178,14 @@ parEval modifier f = $ withDiagMVar (svarInspectMode chan) (dumpChannel chan) - "parEval: waiting to drain" + "parBuffered: waiting to drain" $ takeMVar (outputDoorBell chan) -- XXX remove recursion final chan Just b -> do cleanup chan return b +RENAME(parEval,parBuffered) -- XXX We can have a lconcatMap (unfoldMany) to expand the chunks in the input -- to streams before folding. This will require an input Skip constructor. In @@ -198,7 +204,7 @@ parLmapM = undefined -- -- Definition: -- --- >>> parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parEval cfg c1) (Fold.parEval cfg c2) +-- >>> parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2) -- -- Example: -- @@ -216,13 +222,13 @@ parTeeWith :: MonadAsync m => -> Fold m x a -> Fold m x b -> Fold m x c -parTeeWith cfg f c1 c2 = Fold.teeWith f (parEval cfg c1) (parEval cfg c2) +parTeeWith cfg f c1 c2 = Fold.teeWith f (parBuffered cfg c1) (parBuffered cfg c2) -- | Distribute the input to all the folds in the supplied list concurrently. -- -- Definition: -- --- >>> parDistribute cfg = Fold.distribute . fmap (Fold.parEval cfg) +-- >>> parDistribute cfg = Fold.distribute . fmap (Fold.parBuffered cfg) -- -- Example: -- @@ -235,14 +241,14 @@ parTeeWith cfg f c1 c2 = Fold.teeWith f (parEval cfg c1) (parEval cfg c2) {-# INLINABLE parDistribute #-} parDistribute :: MonadAsync m => (Config -> Config) -> [Fold m a b] -> Fold m a [b] -parDistribute cfg = Fold.distribute . fmap (parEval cfg) +parDistribute cfg = Fold.distribute . fmap (parBuffered cfg) -- | Select first fold for Left input and second for Right input. Both folds -- run concurrently. -- -- Definition -- --- >>> parPartition cfg c1 c2 = Fold.partition (Fold.parEval cfg c1) (Fold.parEval cfg c2) +-- >>> parPartition cfg c1 c2 = Fold.partition (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2) -- -- Example: -- @@ -256,14 +262,14 @@ parDistribute cfg = Fold.distribute . fmap (parEval cfg) {-# INLINABLE parPartition #-} parPartition :: MonadAsync m => (Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y) -parPartition cfg c1 c2 = Fold.partition (parEval cfg c1) (parEval cfg c2) +parPartition cfg c1 c2 = Fold.partition (parBuffered cfg c1) (parBuffered cfg c2) -- | Split and distribute the output to two different folds and then zip the -- results. Both the consumer folds run concurrently. -- -- Definition -- --- >>> parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parEval cfg c1) (Fold.parEval cfg c2) +-- >>> parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2) -- -- Example: -- @@ -277,7 +283,7 @@ parPartition cfg c1 c2 = Fold.partition (parEval cfg c1) (parEval cfg c2) {-# INLINABLE parUnzipWithM #-} parUnzipWithM :: MonadAsync m => (Config -> Config) -> (a -> m (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y) -parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (parEval cfg c1) (parEval cfg c2) +parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (parBuffered cfg c1) (parBuffered cfg c2) -- There are two ways to implement a concurrent scan. -- diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index f0d8f2dc1a..73bf932826 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -22,7 +22,7 @@ module Streamly.Internal.Data.Stream.Concurrent -- ** Evaluate -- | Evaluates a stream concurrently using a channel. - , parEval + , parBuffered -- Add unfoldrM/iterateM? -- ** Generate @@ -55,7 +55,7 @@ module Streamly.Internal.Data.Stream.Concurrent -- ** Stream of streams -- *** Apply - , parApply + , parCrossApply -- *** Concat -- | Shares a single channel across many streams. @@ -66,13 +66,20 @@ module Streamly.Internal.Data.Stream.Concurrent , parConcatIterate -- ** Reactive - , fromCallback + , newStreamAndCallback + , parSetCallback , parTapCount , tapCount + + -- ** Deprecated + , parEval + , parApply + , fromCallback ) where #include "inline.hs" +#include "deprecation.h" import Control.Concurrent (myThreadId, killThread) import Control.Monad (void, when) @@ -121,9 +128,9 @@ import Streamly.Internal.Data.Channel.Types ------------------------------------------------------------------------------- {- -{-# INLINE_NORMAL parEvalD #-} -parEvalD :: MonadAsync m => (Config -> Config) -> D.Stream m a -> D.Stream m a -parEvalD modifier m = D.Stream step Nothing +{-# INLINE_NORMAL parBufferedD #-} +parBufferedD :: MonadAsync m => (Config -> Config) -> D.Stream m a -> D.Stream m a +parBufferedD modifier m = D.Stream step Nothing where step _ Nothing = do @@ -140,14 +147,14 @@ parEvalD modifier m = D.Stream step Nothing D.Stop -> D.Stop -} --- | 'parEval' evaluates a stream as a whole asynchronously with respect to +-- | 'parBuffered' evaluates a stream as a whole asynchronously with respect to -- the consumer of the stream. A worker thread evaluates multiple elements of -- the stream ahead of time and buffers the results; the consumer of the stream -- runs in another thread consuming the elements from the buffer, thus --- decoupling the production and consumption of the stream. 'parEval' can be +-- decoupling the production and consumption of the stream. 'parBuffered' can be -- used to run different stages of a pipeline concurrently. -- --- It is important to note that 'parEval' does not evaluate individual actions +-- It is important to note that 'parBuffered' does not evaluate individual actions -- in the stream concurrently with respect to each other, it merely evaluates -- the stream serially but in a different thread than the consumer thread, -- thus the consumer and producer can run concurrently. See 'parMapM' and @@ -156,16 +163,18 @@ parEvalD modifier m = D.Stream step Nothing -- The evaluation requires only one thread as only one stream needs to be -- evaluated. Therefore, the concurrency options that are relevant to multiple -- streams do not apply here e.g. maxThreads, eager, interleaved, ordered, --- stopWhen options do not have any effect on 'parEval'. +-- stopWhen options do not have any effect on 'parBuffered'. -- -- Useful idioms: -- --- >>> parUnfoldrM step = Stream.parEval id . Stream.unfoldrM step --- >>> parIterateM step = Stream.parEval id . Stream.iterateM step -{-# INLINE parEval #-} -parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a -parEval modifier input = withChannel modifier input (const id) - -- Stream.fromStreamD $ parEvalD cfg $ Stream.toStreamD stream +-- >>> parUnfoldrM step = Stream.parBuffered id . Stream.unfoldrM step +-- >>> parIterateM step = Stream.parBuffered id . Stream.iterateM step +{-# INLINE parBuffered #-} +parBuffered, parEval + :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a +parBuffered modifier input = withChannel modifier input (const id) + -- Stream.fromStreamD $ parBufferedD cfg $ Stream.toStreamD stream +RENAME(parEval,parBuffered) ------------------------------------------------------------------------------- -- combining two streams @@ -410,16 +419,17 @@ parListEagerMin = parList (eager True . stopWhen AnyStops) -- | Apply an argument stream to a function stream concurrently. Uses a -- shared channel for all individual applications within a stream application. -{-# INLINE parApply #-} -{-# SPECIALIZE parApply :: +{-# INLINE parCrossApply #-} +{-# SPECIALIZE parCrossApply :: (Config -> Config) -> Stream IO (a -> b) -> Stream IO a -> Stream IO b #-} -parApply :: MonadAsync m => +parCrossApply, parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b -parApply modifier stream1 stream2 = +parCrossApply modifier stream1 stream2 = parConcatMap modifier (\g -> parConcatMap modifier (Stream.fromPure . g) stream2) stream1 +RENAME(parApply,parCrossApply) ------------------------------------------------------------------------------- -- Map @@ -463,13 +473,13 @@ parSequence modifier = parMapM modifier id -- | Evaluates the streams being zipped in separate threads than the consumer. -- The zip function is evaluated in the consumer thread. -- --- >>> parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parEval cfg m1) (Stream.parEval cfg m2) +-- >>> parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parBuffered cfg m1) (Stream.parBuffered cfg m2) -- -- Multi-stream concurrency options won't apply here, see the notes in --- 'parEval'. +-- 'parBuffered'. -- -- If you want to evaluate the zip function as well in a separate thread, you --- can use a 'parEval' on 'parZipWithM'. +-- can use a 'parBuffered' on 'parZipWithM'. -- {-# INLINE parZipWithM #-} parZipWithM :: MonadAsync m @@ -478,7 +488,8 @@ parZipWithM :: MonadAsync m -> Stream m a -> Stream m b -> Stream m c -parZipWithM cfg f m1 m2 = Stream.zipWithM f (parEval cfg m1) (parEval cfg m2) +parZipWithM cfg f m1 m2 = + Stream.zipWithM f (parBuffered cfg m1) (parBuffered cfg m2) -- | -- >>> parZipWith cfg f = Stream.parZipWithM cfg (\a b -> return $ f a b) @@ -501,7 +512,7 @@ parZipWith cfg f = parZipWithM cfg (\a b -> return $ f a b) -- -- Definition: -- --- >>> parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parEval cfg m1) (Stream.parEval cfg m2) +-- >>> parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parBuffered cfg m1) (Stream.parBuffered cfg m2) -- {-# INLINE parMergeByM #-} parMergeByM :: MonadAsync m @@ -510,7 +521,8 @@ parMergeByM :: MonadAsync m -> Stream m a -> Stream m a -> Stream m a -parMergeByM cfg f m1 m2 = Stream.mergeByM f (parEval cfg m1) (parEval cfg m2) +parMergeByM cfg f m1 m2 = + Stream.mergeByM f (parBuffered cfg m1) (parBuffered cfg m2) -- | Like 'mergeBy' but evaluates both the streams concurrently. -- @@ -600,9 +612,9 @@ parReplicateM cfg n = parSequence cfg . Stream.replicate n -- -- /Pre-release/ -- -{-# INLINE_NORMAL newCallbackStream #-} -newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a) -newCallbackStream = do +{-# INLINE_NORMAL newStreamAndCallback #-} +newStreamAndCallback :: MonadAsync m => m (a -> m (), Stream m a) +newStreamAndCallback = do chan <- newChannel (eager True) -- XXX Add our own thread-id to the SVar as we can not know the callback's @@ -622,15 +634,15 @@ newCallbackStream = do -- XXX Use fromChannelD? return (callback, fromChannel chan) --- XXX Rename this to parSetCallback. Also take the Channel config as argument. --- What config can be set by user here? +-- XXX Take the Channel config as argument. What config can be set by user +-- here? -- -- XXX What happens if an exception occurs when evaluating the stream? The -- result of callback can be used to communicate that. But we can only know -- about the exception on the next callback call. For better handling the user --- can supply an exception sender function as argument to fromCallback. +-- can supply an exception sender function as argument to parSetCallback. --- | @fromCallback f@ creates an entangled pair of a callback and a stream i.e. +-- | @parSetCallback f@ creates an entangled pair of a callback and a stream i.e. -- whenever the callback is called a value appears in the stream. The function -- @f@ is invoked with the callback as argument, and the stream is returned. -- @f@ would store the callback for calling it later for generating values in @@ -641,12 +653,14 @@ newCallbackStream = do -- -- /Pre-release/ -- -{-# INLINE fromCallback #-} -fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a -fromCallback setCallback = Stream.concatEffect $ do - (callback, stream) <- newCallbackStream +{-# INLINE parSetCallback #-} +parSetCallback, fromCallback + :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a +parSetCallback setCallback = Stream.concatEffect $ do + (callback, stream) <- newStreamAndCallback setCallback callback return stream +RENAME(fromCallback,parSetCallback) -- | @parTapCount predicate fold stream@ taps the count of those elements in -- the stream that pass the @predicate@. The resulting count stream is sent to @@ -777,7 +791,7 @@ tapCount = parTapCount parTap :: MonadAsync m => (Stream m a -> m b) -> Stream m a -> Stream m a parTap f m = undefined --- Can we just use a parEval fold in tap? +-- Can we just use a parBuffered fold in tap? -- We can easily convert the Fold to "Stream m a -> m b" form. Check if this -- provides the same perf as above. {-# INLINE parTap #-} diff --git a/src/Streamly/Internal/Data/Stream/Time.hs b/src/Streamly/Internal/Data/Stream/Time.hs index d699fca014..849bca1e69 100644 --- a/src/Streamly/Internal/Data/Stream/Time.hs +++ b/src/Streamly/Internal/Data/Stream/Time.hs @@ -151,11 +151,11 @@ ticks = periodic (return ()) -- speed. -- -- >>> tickStream = Stream.repeatM (return ()) --- >>> ticksRate r = Stream.parEval (Stream.rate (Just r)) tickStream +-- >>> ticksRate r = Stream.parBuffered (Stream.rate (Just r)) tickStream -- {-# INLINE ticksRate #-} ticksRate :: MonadAsync m => Rate -> Stream m () -ticksRate r = parEval (rate (Just r)) $ Stream.repeatM (return ()) +ticksRate r = parBuffered (rate (Just r)) $ Stream.repeatM (return ()) -- XXX The case when the interval is 0, we should run only the stream being -- interjected. @@ -236,7 +236,7 @@ dropLastInterval = undefined -- -- Example: -- --- >>> twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 1 +-- >>> twoPerSec = Stream.parBuffered (Stream.constRate 2) $ Stream.enumerateFrom 1 -- >>> intervals = Stream.intervalsOf 1 Fold.toList twoPerSec -- >>> Stream.fold Fold.toList $ Stream.take 2 intervals -- [...,...] diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index b0e6e1b669..4f5a3a4de7 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -222,12 +222,12 @@ sequenceReplicate cfg = constructWithLenM stream list drainMapM :: Monad m => (a -> m b) -> Stream m a -> m () drainMapM f = Stream.fold (Fold.drainMapM f) -testFromCallback :: IO Int -testFromCallback = do +testParSetCallback :: IO Int +testParSetCallback = do ref <- newIORef Nothing let stream = Stream.parList (Stream.eager True) - [ fmap Just (Stream.fromCallback (setCallback ref)) + [ fmap Just (Stream.parSetCallback (setCallback ref)) , runCallback ref ] Stream.fold Fold.sum $ fmap fromJust $ Stream.takeWhile isJust stream @@ -259,10 +259,10 @@ main = hspec $ describe moduleName $ do let transform = transformCombineFromList Stream.fromList sortEq - prop "parEval" $ + prop "parBuffered" $ transform (fmap (+2)) - (fmap (+1) . Async.parEval id . fmap (+1)) + (fmap (+1) . Async.parBuffered id . fmap (+1)) asyncSpec $ prop "parSequence" . sequenceReplicate @@ -346,17 +346,17 @@ main = hspec (Stream.fromPure 1) (Stream.fromPure 2) s1 cfg = - Async.parApply + Async.parCrossApply cfg (Stream.fromPure (,)) (par2 cfg) s2 cfg = - Async.parApply + Async.parCrossApply cfg (s1 cfg) (Stream.fromPure 3) :: Stream IO (Int, Int) in prop1 - "parApply (async arg1)" . cmp (==) ( [(1, 3), (2, 3)]) . s2 + "parCrossApply (async arg1)" . cmp (==) ( [(1, 3), (2, 3)]) . s2 asyncSpec $ let par2 cfg = @@ -365,7 +365,7 @@ main = hspec (Stream.fromPure (2 :: Int)) (Stream.fromPure 3) s1 = Stream.fromPure (1 :: Int,) - s2 cfg = Async.parApply cfg s1 (par2 cfg) + s2 cfg = Async.parCrossApply cfg s1 (par2 cfg) in prop1 "apply (async arg2)" . cmp (==) ([(1, 2), (1, 3)]) . s2 -- concat @@ -393,4 +393,4 @@ main = hspec describe "Exception propagation" $ exceptionPropagation async -- Ad-hoc tests it "takes n from stream of streams" $ takeCombined 2 - it "fromCallback" $ testFromCallback `shouldReturn` (50*101) + it "parSetCallback" $ testParSetCallback `shouldReturn` (50*101)