Skip to content

Commit

Permalink
Revert parSetCallback to fromCallback
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyaov committed Feb 6, 2025
1 parent 4f42fb9 commit cbcf021
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
3 changes: 1 addition & 2 deletions src/Streamly/Data/Stream/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module Streamly.Data.Stream.Prelude
-- | Generate a stream by evaluating multiple actions concurrently.
, parRepeatM
, parReplicateM
, parSetCallback
, fromCallback

-- *** Map
-- | Map actions on a stream such that the mapped actions are evaluated
Expand Down Expand Up @@ -139,7 +139,6 @@ module Streamly.Data.Stream.Prelude
-- ** Deprecated
, tapCount
, parEval
, fromCallback
, parApply
)
where
Expand Down
15 changes: 6 additions & 9 deletions src/Streamly/Internal/Data/Stream/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ module Streamly.Internal.Data.Stream.Concurrent

-- ** Reactive
, newStreamAndCallback
, parSetCallback
, fromCallback
, parTapCount
, tapCount

-- ** Deprecated
, parEval
, parApply
, fromCallback
)
where

Expand Down Expand Up @@ -640,9 +639,9 @@ newStreamAndCallback = do
-- XXX What happens if an exception occurs when evaluating the stream? The
-- result of callback can be used to communicate that. But we can only know
-- about the exception on the next callback call. For better handling the user
-- can supply an exception sender function as argument to parSetCallback.
-- can supply an exception sender function as argument to fromCallback.

-- | @parSetCallback f@ creates an entangled pair of a callback and a stream i.e.
-- | @fromCallback f@ creates an entangled pair of a callback and a stream i.e.
-- whenever the callback is called a value appears in the stream. The function
-- @f@ is invoked with the callback as argument, and the stream is returned.
-- @f@ would store the callback for calling it later for generating values in
Expand All @@ -653,14 +652,12 @@ newStreamAndCallback = do
--
-- /Pre-release/
--
{-# INLINE parSetCallback #-}
parSetCallback, fromCallback
:: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
parSetCallback setCallback = Stream.concatEffect $ do
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
fromCallback setCallback = Stream.concatEffect $ do
(callback, stream) <- newStreamAndCallback
setCallback callback
return stream
RENAME(fromCallback,parSetCallback)

-- | @parTapCount predicate fold stream@ taps the count of those elements in
-- the stream that pass the @predicate@. The resulting count stream is sent to
Expand Down
8 changes: 4 additions & 4 deletions test/Streamly/Test/Data/Stream/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ sequenceReplicate cfg = constructWithLenM stream list
drainMapM :: Monad m => (a -> m b) -> Stream m a -> m ()
drainMapM f = Stream.fold (Fold.drainMapM f)

testParSetCallback :: IO Int
testParSetCallback = do
testFromCallback :: IO Int
testFromCallback = do
ref <- newIORef Nothing
let stream =
Stream.parList (Stream.eager True)
[ fmap Just (Stream.parSetCallback (setCallback ref))
[ fmap Just (Stream.fromCallback (setCallback ref))
, runCallback ref
]
Stream.fold Fold.sum $ fmap fromJust $ Stream.takeWhile isJust stream
Expand Down Expand Up @@ -393,4 +393,4 @@ main = hspec
describe "Exception propagation" $ exceptionPropagation async
-- Ad-hoc tests
it "takes n from stream of streams" $ takeCombined 2
it "parSetCallback" $ testParSetCallback `shouldReturn` (50*101)
it "fromCallback" $ testFromCallback `shouldReturn` (50*101)

0 comments on commit cbcf021

Please sign in to comment.