Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Control/Concurrent/Async/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ asyncOnWithUnmask cpu actionWith =
asyncUsing ::
CALLSTACK
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing doFork = \action -> do
asyncUsing doFork action = do
var <- newEmptyTMVarIO
let action_plus = debugLabelMe >> action
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
Expand Down Expand Up @@ -207,7 +207,7 @@ withAsyncUsing ::
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow. We can do better by
-- hand-coding it:
withAsyncUsing doFork = \action inner -> do
withAsyncUsing doFork action inner = do
var <- newEmptyTMVarIO
mask $ \restore -> do
let action_plus = debugLabelMe >> action
Expand Down Expand Up @@ -734,7 +734,7 @@ concurrently' left right collect = do
-- ensure the children are really dead
replicateM_ count' (tryAgain $ takeMVar done)

r <- collect (tryAgain $ takeDone) `onException` stop
r <- collect (tryAgain takeDone) `onException` stop
stop
return r

Expand Down Expand Up @@ -801,7 +801,7 @@ forConcurrently_ = flip mapConcurrently_
replicateConcurrently ::
CALLSTACK
Int -> IO a -> IO [a]
replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently
replicateConcurrently cnt = runConcurrently . replicateM cnt . Concurrently

-- | Same as 'replicateConcurrently', but ignore the results.
--
Expand Down Expand Up @@ -927,7 +927,7 @@ rawForkIO ::
CALLSTACK
IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
case fork# action_plus s of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action

Expand All @@ -936,7 +936,7 @@ rawForkOn ::
CALLSTACK
Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) action = IO $ \ s ->
case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
case forkOn# cpu action_plus s of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action

Expand Down
89 changes: 89 additions & 0 deletions Control/Concurrent/Async/Warden.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{-
Copyright (c) Meta Platforms, Inc. and affiliates.
All rights reserved.

This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
-}

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | A more flexible way to create 'Async's and have them automatically
-- cancelled when the 'Warden' is shut down.
Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what's your use case, but this is an example of a more general pattern, see e.g. https://hackage.haskell.org/package/io-region
(For the record: I'm not suggesting anything, just my 2c.)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case I have is Glean (https://github.com/facebookincubator/Glean/blob/main/glean/db/Glean/Database/Env.hs#L102) and this also came up in a discussion on discourse https://discourse.haskell.org/t/multitasking-a-new-concurrency-library/12409/1

I think it's simple enough that we don't need to implement it in terms of anything else, but thanks for the pointer.

module Control.Concurrent.Async.Warden
( Warden
, withWarden
, create
, shutdown
, spawn
, spawn_
, spawnMask
, WardenException(..)
) where

import Control.Concurrent (forkIO)
import Control.Concurrent.Async (Async)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Data.HashSet (HashSet)
import qualified Data.HashSet as HashSet
import System.IO (fixIO)


-- | A 'Warden' is an owner of 'Async's which cancels them on 'shutdown'.
--
-- 'Nothing' in the MVar means the 'Warden' has been shut down.
newtype Warden = Warden (MVar (Maybe (HashSet (Async ()))))

-- | Run the action with a new 'Warden', and call 'shutdown' when the action
-- exits.
withWarden :: (Warden -> IO a) -> IO a
withWarden = bracket create shutdown

-- | Create a new 'Warden'.
create :: IO Warden
create = Warden <$> newMVar (Just mempty)

-- | Shutdown a 'Warden', calling 'cancel' on all owned threads. Subsequent
-- calls to 'spawn' and 'shutdown' will be no-ops.
--
-- Note that any exceptions thrown by the threads will be ignored. If you want
-- exceptions to be propagated, either call `wait` explicitly on the 'Async',
-- or use 'link'.
shutdown :: Warden -> IO ()
shutdown (Warden v) = do
r <- swapMVar v Nothing
mapM_ (Async.mapConcurrently_ Async.cancel) r

forget :: Warden -> Async a -> IO ()
forget (Warden v) async = modifyMVar_ v $ \x -> case x of
Just xs -> return $! Just $! HashSet.delete (void async) xs
Nothing -> return Nothing

-- | Spawn a thread with masked exceptions and pass an unmask function to the
-- action.
spawnMask :: Warden -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
spawnMask (Warden v) action = modifyMVar v $ \r -> case r of
Just asyncs -> do
-- Create a new thread which removes itself from the 'HashSet' when it
-- exits.
this <- fixIO $ \this -> mask_ $ Async.asyncWithUnmask $ \unmask ->
action unmask `finally` forget (Warden v) this
return (Just $ HashSet.insert (void this) asyncs, this)
Nothing -> throwIO $ WardenException "Warden has been shut down"

newtype WardenException = WardenException String
deriving (Show)

instance Exception WardenException

-- | Spawn a new thread owned by the 'Warden'.
spawn :: Warden -> IO a -> IO (Async a)
spawn warden action = spawnMask warden $ \unmask -> unmask action

-- | Spawn a new thread owned by the 'Warden'.
spawn_ :: Warden -> IO () -> IO ()
spawn_ w = void . spawn w
138 changes: 138 additions & 0 deletions Control/Concurrent/Stream.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{-
Copyright (c) Meta Platforms, Inc. and affiliates.
All rights reserved.

This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
-}

-- | Processing streams with a fixed number of worker threads
module Control.Concurrent.Stream
( stream
, streamBound
, streamWithInput
, streamWithOutput
, streamWithInputOutput
, mapConcurrentlyBounded
, forConcurrentlyBounded
) where

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Maybe
import Data.IORef

data ShouldBindThreads = BoundThreads | UnboundThreads

-- | Maps a fixed number of workers concurrently over a stream of values
-- produced by a producer function. The producer is passed a function to
-- call for each work item. If a worker throws a synchronous exception, it
-- will be propagated to the caller.
stream
:: Int -- ^ Maximum Concurrency
-> ((a -> IO ()) -> IO ()) -- ^ Producer
-> (a -> IO ()) -- ^ Worker
-> IO ()
stream maxConcurrency producer worker =
streamWithInput producer (replicate maxConcurrency ()) $ const worker

-- | Like stream, but uses bound threads for the workers. See
-- 'Control.Concurrent.forkOS' for details on bound threads.
streamBound
:: Int -- ^ Maximum Concurrency
-> ((a -> IO ()) -> IO ()) -- ^ Producer
-> (a -> IO ()) -- ^ Worker
-> IO ()
streamBound maxConcurrency producer worker =
stream_ BoundThreads producer (replicate maxConcurrency ()) $ const worker

-- | Like stream, but each worker is passed an element of an input list.
streamWithInput
:: ((a -> IO ()) -> IO ()) -- ^ Producer
-> [b] -- ^ Worker state
-> (b -> a -> IO ()) -- ^ Worker
-> IO ()
streamWithInput = stream_ UnboundThreads

-- | Like 'stream', but collects the results of each worker
streamWithOutput
:: Int
-> ((a -> IO ()) -> IO ()) -- ^ Producer
-> (a -> IO c) -- ^ Worker
-> IO [c]
streamWithOutput maxConcurrency producer worker =
streamWithInputOutput producer (replicate maxConcurrency ()) $
const worker

-- | Like 'streamWithInput', but collects the results of each worker
streamWithInputOutput
:: ((a -> IO ()) -> IO ()) -- ^ Producer
-> [b] -- ^ Worker input
-> (b -> a -> IO c) -- ^ Worker
-> IO [c]
streamWithInputOutput producer workerInput worker = do
results <- newIORef []
let prod write = producer $ \a -> do
res <- newIORef Nothing
modifyIORef results (res :)
write (a, res)
stream_ UnboundThreads prod workerInput $ \s (a,ref) -> do
worker s a >>= writeIORef ref . Just
readIORef results >>= mapM readIORef >>= return . catMaybes . reverse

stream_
:: ShouldBindThreads -- use bound threads?
-> ((a -> IO ()) -> IO ()) -- ^ Producer
-> [b] -- Worker input
-> (b -> a -> IO ()) -- ^ Worker
-> IO ()
stream_ useBoundThreads producer workerInput worker = do
let maxConcurrency = length workerInput
q <- atomically $ newTBQueue (fromIntegral maxConcurrency)
let write x = atomically $ writeTBQueue q (Just x)
mask $ \unmask ->
concurrently_ (runWorkers unmask q) $ unmask $ do
-- run the producer
producer write
-- write end-markers for all workers
replicateM_ maxConcurrency $
atomically $ writeTBQueue q Nothing
where
runWorkers unmask q = case useBoundThreads of
BoundThreads ->
foldr1 concurrentlyBound $
map (runWorker unmask q) workerInput
UnboundThreads ->
mapConcurrently_ (runWorker unmask q) workerInput

concurrentlyBound l r =
withAsyncBound l $ \a ->
withAsyncBound r $ \b ->
void $ waitBoth a b

runWorker unmask q s = do
v <- atomically $ readTBQueue q
case v of
Nothing -> return ()
Just t -> do
unmask (worker s t)
runWorker unmask q s

-- | Concurrent map over a list of values, using a bounded number of threads.
mapConcurrentlyBounded
:: Int -- ^ Maximum concurrency
-> (a -> IO b) -- ^ Function to map over the input values
-> [a] -- ^ List of input values
-> IO [b] -- ^ List of output values
mapConcurrentlyBounded maxConcurrency f input =
streamWithOutput maxConcurrency (forM_ input) f

-- | 'mapConcurrentlyBounded' but with its arguments reversed
forConcurrentlyBounded
:: Int -- ^ Maximum concurrency
-> [a] -- ^ List of input values
-> (a -> IO b) -- ^ Function to map over the input values
-> IO [b] -- ^ List of output values
forConcurrentlyBounded = flip . mapConcurrentlyBounded
8 changes: 6 additions & 2 deletions async.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.2.5
version: 2.2.6
-- don't forget to update ./changelog.md!
synopsis: Run IO operations asynchronously and wait for their results

Expand Down Expand Up @@ -81,14 +81,18 @@ library
other-extensions: Trustworthy
exposed-modules: Control.Concurrent.Async
Control.Concurrent.Async.Internal
Control.Concurrent.Async.Warden
Control.Concurrent.Stream
build-depends: base >= 4.3 && < 4.22,
hashable >= 1.1.2.0 && < 1.6,
stm >= 2.2 && < 2.6
stm >= 2.2 && < 2.6,
unordered-containers >= 0.2 && < 0.3
if flag(debug-auto-label)
cpp-options: -DDEBUG_AUTO_LABEL

test-suite test-async
default-language: Haskell2010
ghc-options: -threaded
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: test-async.hs
Expand Down
9 changes: 9 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## Changes in 2.2.6

- Added Control.Concurrent.Stream for processing streams with a fixed
number of workers. Includes a bounded version of mapConcurrently:
mapConcurrentlyBounded.
- Added Control.Concurrent.Async.Warden for a way to create Asyncs that
is more flexible than 'withAsync' but retains the guarantee of cancelling
orphaned threads, unlike 'async'.

## Changes in 2.2.5

- #117: Document that empty for Concurrently waits forever
Expand Down
Loading