A producer & consumer client library for AWS Kinesis

Latest on Hackage:

This package is not currently in any snapshots. If you're interested in using it, we recommend adding it to Stackage Nightly. Doing so will make builds more reliable, and allow to host generated Haddocks.

Apache-2.0 licensed by Jon Sterling
Maintained by

This package provides a Producer client for bulk-writing messages to a Kinesis stream through a bounded queue, and a Consumer client for reading messages round-robin from all shards of a Kinesis stream. Additionally, a rudimentary CLI interface to the Consumer is provided.



This release contains (only) breaking changes to the Consumer CLI.

We have switched to using the configuration-tools package for passing options to the CLI. This makes configuration much more robust, and adds support for using a YAML-formatted configuration file, as well as granularly overriding options in a cascading manner (through multiple configuration files, or by passing command line arguments).

See example_config.yml for an example configuration file. Run kinesis-cli --help to see supported command-line arguments.


This release fixes a bug in the Consumer where a saved state could get clobbered in case no further records were found at the restored point on a shard.


This release contains breaking changes.

  • Replaced the internal use of MonadError with proper open-world exceptions. The exposed operations which may throw exceptions have been adjusted to return m (Either e a); resolves issue #17.

  • [Producer] Implemented a cleaner shutdown routine policy: previously, the producer would stay alive indefinitely until the thread on which it was created is killed. Now, the producer is shut down immediately once the scope of withKinesisProducer exits, at which point the queue will be flushed according to a configurable (and optional) timeout. Resolves issue #19.

  • [Producer] Restructured the monolithic error sum types into many smaller types to better support the standard mode of use for error handling.

  • [Consumer] Removed unused ConsumerError type.

  • [Consumer] Changed batch size to use Natural type rather than Int.

  • [Producer] Parameterized the implementation by an arbitrary queue structure; by default, the producer’s message queue is now implemented using TBMChan to avoid a bug in TQueue which causes live-locking under very heavy load.

Other non-breaking changes:

  • [Producer] A better, more reliable implementation of consuming the input queue in chunks.

  • Removed dependencies on either, errors, hoist-error, stm-conduit; added dependency on enclosed-exceptions.

  • Increased lower bound on monad-control to 1.0 (for hygienic purposes).


  • Upgrade aws-general, aws-kinesis lower bounds.


  • [Producer] Write errors to stderr rather than stdout.

  • [Consumer CLI] Exit with a failure code (1) when the run is not considered successful (i.e. if a limit was specified, and the CLI failed to retrieve that many records).


All changes were in the Producer client.

  • Do not retry messages indefinitely; add support for a configurable retry policy.

  • Reject messages synchronously which exceed the Kinesis record size limit.


All changes were in the Consumer CLI.

  • Add a –timeout option, which will cause the consumer CLI terminate after a specified number of seconds.

  • Change the behavior of –save-state to only save the state when a run of the consumer CLI was successful (i.e. either no limit was specified, or the precise number of items requested was indeed retrieved within the alotted time).


  • Upgrade to newer versions of aws-general and aws-kinesis which support the security token header (necessary for using credentials from EC2 instance metadata). The –use-instance-metadata option should now work properly.

  • When –restore-state is passed with a non-existent file, it will be ignored; if it is passed with a malformed file, an error will be thrown; the old behavior was to throw an error in either case.


All changes were in the Consumer CLI.

  • Add switch to get credentials from EC2 instance metadata, –use-instance-metadata

  • Make –limit optional to consume indefinitely

  • Allow custom region with –region

  • If the CLI is terminated for any reason, before shutting down it will record its state if –save-state is passed


  • [Consumer] Add the ability to save & restore stream state (i.e. last consumed sequence number per shard).

  • [Consumer CLI] Add –save-state, –restore-state options


  • Support specifying AWS keys as options to the CLI (either directly or from a file). You must specify one of these options.

  • Remove filtering & date range capabilities from the CLI; remove --raw option, which will now be the only behavior.

  • Fix CLI to print unescaped strings.


  • Add some throttling to the consumer loop to avoid rate limiting
  • Relax the lower-bound on monad-control
  • Upgrade to lens-4.7, add lens-action dependency


Initial release

comments powered byDisqus