Conduit interface for cassava package

Version on this page:
LTS Haskell 8.20:
Stackage Nightly 2017-06-26:
Latest on Hackage:
BSD3 licensed and maintained by Dom De Re

Module documentation for

cassava-conduit Build Status Hackage

Conduit interface for cassava package

Streaming to CSV is not 100% complete at this stage, and doesn't support encoding to CSV with a header yet

Example Usage

The examples project

There is a project containing some examples of the usage, but the gist is here:

import Data.Csv
import Data.Conduit
import Data.Csv.Conduit

data InputRecord = ...

instance FromRecord InputRecord where

data OutputRecord = ...

instance ToRecord OutputRecord where

decodeOpts :: Word8 -> DecodeOptions

encodeOpts :: Word8 -> EncodeOptions

processInput :: InputRecord -> OutputRecord

-- |
--  A Conduit pipeline that streams from '../exampledata/sampleinput.psv', decodes it from a pipe seperated format,
--  processes it with 'processInput' and the encodes it to pipe seperated format and streams it out to '../exampledata/sampleoutput.psv'
--  The first time it encounters a parse error, it will stop streaming and return the error, dropping any decoded records that came through in that batch also...
conduitPipeline :: (MonadError CsvParseError m, MonadResource m) => m ()
conduitPipeline = sourceFile "../exampledata/sampleinput.psv" $$ fromCsv (decodeOpts $ fromIntegral $ ord '|') HasHeader =$= map processInput =$= toCsv (encodeOpts $ fromIntegral $ ord '|') =$= sinkFile "../exampledata/sampleoutput.psv"

main :: IO ()
main = do
    res <- runEitherT $ bimapEitherT showError id $ runResourceT conduitPipeline
    either putStrLn return res

Building the examples project

$ cd examples
$ cabal sandbox init
$ cabal sandbox add-source ../
$ cabal install --only-dependencies
$ cabal build

Building the project

./mafia build

Running Unit Tests

./mafia test


0.4.x ->

cassava was bumped from 0.4.* to 0.5.*.

There were some semantic changes going from cassava 0.4. -> 0.5., see here

But I don't think they muddle with any of the explicit cassava-conduit semantics:

  • QuoteMinimal semantics are not defined by cassava-conduit, so it will change behaviour, but
  • cassava-conduit won't compensate for it (hence the bump from 0.3 to 0.4).
  • It doesn't use foldl'
  • cassava-conduit doesn't use encodeByNamedWith so the encIncludeHeader flag shouldn't have any effect.


0.2.2 -> 0.3.0

Some new error types, and error contain T.Text and not String now

data CsvParseError =
        CsvParseError BS.ByteString T.Text
    |   IncrementalError T.Text
        deriving (Show, Eq)

-- | When you want to include errors in the stream, this error type represents errors that halt the stream.
-- They do not appear inside the conduit and will instead get returned from running the conduit.
data CsvStreamHaltParseError = HaltingCsvParseError BS.ByteString T.Text -- ^ the remaining bytestring that was read in but not parsed yet, and the stringy error msg describing the fail.
    deriving (Show, Eq)

-- | When you want to include errors in the stream, these are the errors that can be included in the stream,
-- they are usually problems restricted to individual records, and streaming can resume from the next record
-- you just have to decide on something sensible to do with the per record errors.
data CsvStreamRecordParseError = CsvStreamRecordParseError T.Text deriving (Show, Eq) -- ^ The stringy error describing why this record could not be parsed.

New error types are to separate out errors that stop streaming (and hence imply there are valid records that might be omitted) and errors that can be skipped, allowing valid records after to be processed...

-- |
-- Same as `fromCsv` but allows for errors to be handled in the pipeline instead
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromCsvStreamError opts h f = {-# SCC fromCsvStreamError_p #-} streamParser f $ decodeWith opts h

-- |
-- Like `fromNamedCsvStream` but allows for errors to be handled in the pipeline itself.
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromNamedCsvStreamError opts f = {-# SCC fromCsvStreamError_p #-} streamHeaderParser f $ decodeByNameWith opts


0.1.0 -> 0.2.0

fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a


0.0.1 -> 0.1.0

fromNamedCsv :: (Show a, Monad m, FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsvStreamError :: (Monad m, FromNamedRecord a) => DecodeOptions -> Conduit BS.ByteString m (Either CsvParseError a)
comments powered byDisqus