Hoogle Search
Within LTS Haskell 24.46 (ghc-9.10.3)
Note that Stackage only displays results for the latest LTS and Nightly snapshot. Learn more.
trimapPipeF :: (i -> j) -> (p -> o) -> (u -> v) -> PipeF j p v a -> PipeF i o u aconduino Data.Conduino.Internal No documentation available.
commaParser :: Char -> Parser ()conduit-aeson Data.Conduit.Aeson Use a comma for delimiter.
-
conduit-algorithms Data.Conduit.Algorithms.Async This is like map, except that each element is processed in a separate thread (up to maxThreads can be queued up at any one time). Results are evaluated to normal form (not weak-head normal form!, i.e., the structure is deeply evaluated) to ensure that the computation is fully evaluated in the worker thread. Note that there is some overhead in threading. It is often a good idea to build larger chunks of input before passing it to asyncMapC to amortize the costs. That is, when f is not a lot of work, instead of asyncMapC f, it is sometimes better to do
CC.conduitVector 4096 .| asyncMapC (V.map f) .| CC.concat
where CC refers to Combinators See unorderedAsyncMapC -
conduit-algorithms Data.Conduit.Algorithms.Async asyncMapC with error handling. The inner function can now return an error (as a Left). When the first error is seen, it throwErrors in the main monad. Note that f may be evaluated for arguments beyond the first error (as some threads may be running in the background and already processing elements after the first error). See asyncMapC
-
conduit-algorithms Data.Conduit.Algorithms.Async A version of asyncMapC which can reorder results in the stream If the order of the results is not important, this function can lead to a better use of resources if some of the chunks take longer to complete. See asyncMapC
-
conduit-algorithms Data.Conduit.Algorithms.Async.ByteString Apply a function to groups of lines Note that this is much more efficient than the (more or less equivalent, except that the intermediate lists can be of varying sizes):
CB.lines .| CC.conduitVector N .| CAlg.asyncMapC nthreads (f . V.toList)
The reason being that splitting into lines then becomes the bottleneck and processing a single line is typically a tiny chunk of work so that the threading overhead overwhelms the advantage of using multiple cores. Instead, asyncMapLineGroupsC will pass big chunks to the worker thread and perform most of the line splitting _in the worker thread_. Only Unix-style ASCII lines are supported (splitting at Bytes with value 10, i.e., \n). When Windows lines (\r\n) are passed to this function, this results in each element having an extra \r at the end. package
conduit-concurrent-map Concurrent, order-preserving mapping Conduit Provides a Conduit that maps a function concurrently over incoming elements, maintaining input order.
module Data.Conduit.
ConcurrentMap Functions for concurrent mapping over Conduits.
-
conduit-concurrent-map Data.Conduit.ConcurrentMap concurrentMapM numThreads workerOutputBufferSize f
Concurrent, order-preserving conduit mapping function. Like mapM, but runs in parallel with the given number of threads, returns outputs in the order of inputs (like mapM, no reordering), and allows defining a bounded size output buffer for elements of type b to maintain high parallelism despite head-of-line blocking. Because of the no-reordering guarantee, there is head-of-line blocking: When the conduit has to process a long-running computation and a short-running computation in parallel, the result of short one cannot be yielded before the long one is done. Unless we buffer the queued result somewhere, the thread that finished the short-running computation is now blocked and sits idle (low utilisation). To cope with this, this function gives each thread workerOutputBufferSize output slots to store bs while they are blocked. Use the convenience concurrentMapM_numCaps when f is CPU-bound. workerOutputBufferSize must be given >= 1. The workerOutputBufferSize keeps the memory usage of the conduit bounded, namely to numThreads * (workerOutputBufferSize + 1) many bs at any given time (the + 1 is for the currently processing ones). To achieve maximum parallelism/utilisation, you should choose workerOutputBufferSize ideally as the time factor between the fastest and slowest f that will likely pass through the conduit; for example, if most fs take 3 seconds, but some take 15 seconds, choose workerOutputBufferSize = 5 to avoid an earlier 15-second f blocking a later 3-second f. The threads inside the conduit will evaluate the results of the f to WHNF, as in !b <- f a, so don't forget to make f itself deepseq the result if there is any lazy data structure involved and you want to make sure that they are evaluated *inside* the conduit (fully in parallel) as opposed to the lazy parts of them being evaluated after being yielded. As fs happen concurrently, they cannot depend on each other's monadic state. This is enforced by the MonadUnliftIO constraint. This means the function cannot be used with e.g. StateT. Properties:- Ordering / head of line blocking for outputs: The bs will come out in the same order as their corresponding as came in (the parallelism doesn't change the order).
- Bounded memory: The conduit will only hold to numThreads * (workerOutputBufferSize + 1) as many bs.
- High utilisation: The conduit will try to keep all cores busy as much as it can. This means that after awaiting an input, it will only block to wait for an output from a worker thread if it has to because we're at the workerOutputBufferSize output buffer bound of b elements. (It may, however, yield even if the queue is not full. Since yield will block the conduit's thread until downstream conduits in the pipeline await, utilisation will be poor if other conduits in the pipeline have low throughput. This makes sense because a conduit pipeline's total throughput is bottlenecked by the segment in the pipeline.) It also ensures that any worker running for longer than others does not prevent other free workers from starting new work, except from when we're at the workerOutputBufferSize output buffer bound of b elements.
- Prompt starting: The conduit will start each awaited value immediately, it will not batch up multiple awaits before starting.
- Async exception safety: When then conduit is killed, the worker threads will be killed too.
puts :: (MonadIO m) => String -> m () -- for non-interleaved output puts s = liftIO $ BS8.putStrLn (BS8.pack s) runConduitRes (CL.sourceList [1..6] .| concurrentMapM_ 4 (\i -> liftIO $ puts (show i ++ " before") >> threadDelay (i * 1000000) >> puts (show i ++ " after") >> return (i*2)) .| CL.consume )
-
conduit-concurrent-map Data.Conduit.ConcurrentMap Deprecated: Use concurrentMapM (without _), as this function yields b; in the future, this function may be replaced by one that does not yield b