nakadi-client is a BSD2/BSD3 licensed Haskell client library for interacting with the Nakadi event broker system developed by Zalando. The streaming is built on top of Conduit.

Please note that the API is not considered stable yet.

nakadi-client provides:

  • Docker based test suite testing against the official Nakadi docker image.

  • A type-safe API for interacting with Nakadi. For example, the name of an event type has type EventTypeName, not Text or something generic. Correct types for values like CursorOffset are provided (which must be treated as opaque strings).

  • Integrated and configurable retry mechanism.

  • Conduit based interfaces for streaming events.

  • Support for temporary subscriptions.

  • Convenient Subscription API interface (subscriptionProcess & subscriptionProcessConduit), which frees the user from any manual bookkeeping.

  • Mechanism for registering callbacks for logging and token injection.


Example code showing how to dump a subscription:

dumpSubscription :: (MonadLogger m, MonadNakadi IO m) => Nakadi.SubscriptionId -> m ()
dumpSubscription subscriptionId =
  Nakadi.subscriptionProcess Nothing subscriptionId processBatch

  where processBatch :: MonadLogger m => Nakadi.SubscriptionEventStreamBatch Value -> m ()
        processBatch batch =
          logInfoN (tshow batch)


Nakadi-Client Change Log


  • Nakadi business events are supported.

  • Support for concurrent workers consuming a subscription. This feature is activated using the function setWorkerThreads.

  • The modelling of the subscription API has been simplified by by differentiating between SubscriptionRequest (before creation) and Subscription (after creation).

  • An experimental API for creating a Conduit event source for a given subscription is included, allowing simulating the old low-level API using the Subscription API.

  • A new function withSubscription is included, providing an interface for creating a subscription and automatically passing its subscription ID to some user-provided action.

  • A new function withTemporarySubscription is included, which is very similar to withSubscription, but with the crucicial difference that the subscription will be automatically deleted after the user-provided action has terminated.

  • Support for the new show_time_lag field when retrieving subscription statistics has been added.

  • An experimental MonadNakadi instance for the IO monad using a global Nakadi configuration has been added. The new module Network.Nakadi.Unsafe.IO exposes functionality for accessing this global Nakadi configuration. The new instance for IO allows e.g. the evaluation of Nakadi calls interactively in GHCi without the need to run any monad transformers.

  • A new convenience function newConfigFromEnv is exposed, which allows creating a Nakadi configuration with the Nakadi service URL being derived automatically from the environment variable NAKADI_URL.

