Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ jobs:
DISABLE_BENCH: "y"
DISABLE_DOCS: "y"
ENABLE_DOCSPEC: "y"
DOCSPEC_URL: https://github.com/phadej/cabal-extras/releases/download/cabal-docspec-0.0.0.20210111/cabal-docspec-0.0.0.20210111.xz
DOCSPEC_URL: https://github.com/phadej/cabal-extras/releases/download/cabal-docspec-0.0.0.20250606/cabal-docspec-0.0.0.20250606-x86_64-linux.xz
DOCSPEC_OPTIONS: "--timeout 60 --check-properties --property-variables xs"
command: |
sed -i 's/other-modules:/exposed-modules:/g' streamly.cabal
Expand Down
1 change: 0 additions & 1 deletion core/src/Streamly/Internal/Data/StreamK.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,6 @@ parseChunksGeneric = GenArr.parse
--
{-# INLINE sortBy #-}
sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
-- sortBy f = Stream.concatPairsWith (Stream.mergeBy f) Stream.fromPure
sortBy cmp =
let p =
Parser.groupByRollingEither
Expand Down
1 change: 1 addition & 0 deletions core/src/doctest/DocTestDataStreamK.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{- $setup

>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Concurrent (threadDelay)
>>> import Data.Function (fix, (&))
>>> import Data.Semigroup (cycle1)
Expand Down
1 change: 1 addition & 0 deletions core/src/doctest/DocTestFileSystemWindowsPath.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{- $setup
>>> :m
>>> :set -XQuasiQuotes
>>> :set -XScopedTypeVariables
>>> import Control.Exception (SomeException, evaluate, try)
>>> import Data.Either (Either, isLeft)
>>> import Data.Maybe (fromJust, isNothing, isJust)
Expand Down
2 changes: 2 additions & 0 deletions docs/User/Project/Upgrading-0.8-to-0.9.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,5 @@ new release is `parConcatMap`. The config argument in `parConcatMap` can
specify an equivalent of the combining operation. Similarly, concurrent
`concatFoldableWith`, `concatMapFoldableWith`, `concatForFoldableWith` can also
be expressed using `parConcatMap`.

The equivalent of `tapAsync` is `tap` with a `parBuffered` fold.
66 changes: 50 additions & 16 deletions src/Streamly/Data/Stream/MkType.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,47 +31,66 @@
--
-- == Parallel
--
-- An unordered concurrent version of the serial 'Nested' type. Runs multiple
-- iterations of the nested loops concurrently, iterations may execute out of
-- order. More outer iterations are started only if the existing inner
-- iterations are not saturating the resources.
-- A newtype wrapper over the 'Stream' type; the Applicative and Monad
-- instances generate a cross product of the two streams in a concurrent
-- manner. The order in which the stream elements are produced is not
-- deterministic, this is supposed to be used if order does not matter.

-- Loops over the outer stream, generating multiple elements concurrently; for
-- each outer stream element, loop over the inner stream concurrently. More
-- outer iterations are started only if the existing inner iterations are not
-- saturating the resources.
--
-- Use 'mkParallel' to construct from 'Stream' type and 'unParallel' to
-- deconstruct back to 'Stream'.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- bind = flip (Stream.parConcatMap id)
-- $(mkCrossType "Parallel" "bind" True)
-- :}
--
-- This is a bounded concurrent, unordered list-transformer (ListT) monad.
--
-- WARNING! By design, monad bind of this type is not associative, because of
-- concurrency order of effects as well as results may be unpredictable.
-- concurrency, order of effects as well as results is non-deterministic.
--
-- Same as the deprecated 'Streamly.Prelude.AsyncT' type.
-- Serves the same purpose as the 'Streamly.Prelude.AsyncT' type in older
-- releases.
--
-- == FairParallel
--
-- Like Parallel but strikes a balance between going deeper into existing
-- iterations of the loop and starting new iterations.
-- iterations of the loop and starting new outer loop iterations.
--
-- Use 'mkFairParallel' to construct from 'Stream' type and 'unFairParallel' to
-- deconstruct back to 'Stream'.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- bind = flip (Stream.parConcatMap (Stream.interleaved True))
-- $(mkCrossType "FairParallel" "bind" True)
-- :}
--
-- This is a bounded concurrent, fair logic programming (LogicT) monad.
--
-- WARNING! By design, monad bind of this type is not associative, because of
-- concurrency order of effects as well as results may be unpredictable.
-- concurrency, order of effects as well as results may be unpredictable.
--
-- Same as the deprecated 'Streamly.Prelude.WAsyncT' type.
-- Serves the same purpose as the 'Streamly.Prelude.WAsyncT' type in older
-- releases.
--
-- == EagerParallel
--
-- Like Parallel, but executes as many actions concurrently as possible. This
-- is useful if you want all actions to be scheduled at the same time so that
-- something does not get starved due to others.
--
-- Use 'mkEagerParallel' to construct from 'Stream' type and 'unEagerParallel'
-- to deconstruct back to 'Stream'.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- parBind = flip (Stream.parConcatMap (Stream.eager True))
-- $(mkCrossType "EagerParallel" "parBind" True)
-- :}
Expand All @@ -81,7 +100,8 @@
-- WARNING! By design, monad bind of this type is not associative, because of
-- concurrency order of effects as well as results may be unpredictable.
--
-- Same as the deprecated 'Streamly.Prelude.ParallelT' type.
-- Serves the same purpose as the 'Streamly.Prelude.ParallelT' type in older
-- releases.
--
-- == OrderedParallel
--
Expand All @@ -90,7 +110,11 @@
-- specified in the code. This is closest to the serial Nested type in behavior
-- among all the concurrent types.
--
-- Use 'mkOrderedParallel' to construct from 'Stream' type and
-- 'unOrderedParallel' to deconstruct back to 'Stream'.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- bind = flip (Stream.parConcatMap (Stream.ordered True))
-- $(mkCrossType "OrderedParallel" "bind" True)
-- :}
Expand All @@ -100,26 +124,36 @@
-- WARNING! Monad bind of this type is associative for values, but because of
-- concurrency, order of effects may be unpredictable.
--
-- Same as the deprecated 'Streamly.Prelude.AheadT' type.
-- Serves the same purpose as the 'Streamly.Prelude.AheadT' type in older
-- releases.
--
-- == Zip
--
-- An applicative type to zip two streams.
-- A newtype wrapper over the 'Stream' type, the applicative instance zips two
-- streams.
--
-- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back
-- to 'Stream'.
--
-- >>> :{
-- zipApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b
-- zipApply = Stream.zipWith ($)
-- $(mkZipType "Zip" "zipApply" False)
-- :}
--
-- Same as the deprcated 'Streamly.Prelude.ZipSerialM' type.
--
-- == ParZip
-- == ZipParallel
--
-- Like Zip but evaluates the streams being zipped concurrently.
--
-- Like Zip but evaluates the two streams concurrently.
-- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to
-- deconstruct back to 'Stream'.
--
-- >>> :{
-- parCrossApply = Stream.parCrossApply id
-- $(mkZipType "ParZip" "parCrossApply" True)
-- parZipApply :: MonadAsync m => Stream m (a -> b) -> Stream m a -> Stream m b
-- parZipApply = Stream.parZipWith id id
-- $(mkZipType "ZipParallel" "parZipApply" True)
-- :}
--
-- Same as the deprecated 'Streamly.Prelude.ZipAsync' type.
Expand Down
Loading