logstash
Logstash client library for Haskell
https://github.com/mbg/logstash#readme
| LTS Haskell 24.16: | 0.1.0.4 | 
| Stackage Nightly 2025-10-23: | 0.1.0.4 | 
| Latest on Hackage: | 0.1.0.4 | 
logstash-0.1.0.4@sha256:f3b49b9a186ea845fef41fe36d29acf8c083ccde7e0cc78a5fd10df09614790e,1388Module documentation for 0.1.0.4
Haskell client library for Logstash
This library implements a client for Logstash in Haskell. The following features are currently supported:
- Connections to Logstash via TCP or TLS (tcpinput type).
- Support for the json_linescodec out of the box and custom codecs can be implemented (arbitraryByteStringdata can be sent across the connections).
- This library can either be used without any logging framework, as a backend for monad-logger, or as a backend forkatip.
- Log messages can either be written synchronously or asynchronously.
For example, to connect to a Logstash server via TCP at 127.0.0.1:5000 (configuration given by def) and send a JSON document synchronously with a timeout of 1s and the default retry policy from Control.Retry:
data Doc = Doc String
instance ToJSON Doc where
    toJSON (Doc msg) = object [ "message" .= msg ]
main :: IO ()
main = runLogstashConn (logstashTcp def) retryPolicyDefault 1000000 $
    stashJsonLine (Doc "Hello World")
Only the tcp input type (with or without TLS) is currently supported. For example, without TLS, the Logstash input configuration should roughly be:
input {
    tcp {
        port => 5000
        codec => "json_lines"
    }
}
With TLS, the expected Logstash configuration should roughly be:
input {
    tcp {
        port => 5000
        ssl_cert => "/usr/share/logstash/tls/cert.pem"
        ssl_key => "/usr/share/logstash/tls/key.pem"
        ssl_key_passphrase => "foobar"
        ssl_enable => true 
        ssl_verify => false
        codec => "json_lines"
    }
}
Configuring connections
Connections to Logstash are represented by the LogstashConnection type. To connect to Logstash via tcp use the Logstash.TCP module which exports four principal functions. Note that none of these functions establish any connections when they are called - instead, they allow runLogstashConn and runLogstashPool to establish connections/reuse them as needed:
- logstashTcpwhich, given a hostname and a port, will produce an- Acquirethat can be used with- runLogstashConn.
- logstashTcpPoolwhich, given a hostname and a port, will produce a- Poolthat can be used with- runLogstashPool.
- logstashTlswhich, given a hostname, a port, and TLS client parameters, will produce an- Acquirethat can be used with- runLogstashConn.
- logstashTlsPoolwhich, given a hostname, a port, and TLS client parameters, will produce a- Poolthat can be used with- runLogstashPool.
For logstashTls and logstashTlsPool, TLS ClientParams are required. It is worth noting that the defaultParamsClient function in the tls package does not set any supported ciphers and does not load the system trust store by default. For relatively sane defaults, it is worth using newDefaultClientParams from network-simple-tls instead. For example:
main :: IO ()
main = do 
    params <- newDefaultClientParams ("127.0.0.1", "")
    runLogstashConn (logstashTls def params) retryPolicyDefault 1000000 $ 
        stashJsonLine myDocument
Logging things
The Logstash module exports functions for synchronous and asynchronous logging. Synchronous logging is acceptable for applications or parts of applications that are largely single-threaded where blocking on writes to Logstash is not an issue. For multi-threaded applications, such as web applications or services, you may wish to write log messages to Logstash asynchronously instead. In the latter model, log messages are added to a bounded queue which is processed asynchronously by worker threads.
Synchronously
The logging functions exported by the Logstash module are backend-independent can be invoked synchronously with runLogstash, which is overloaded to work with either Acquire LogstashConnection or LogstashPool (Pool LogstashConnection) values and maps to one of the two implementations described below. In either case, you must supply a retry policy and a timeout (in microseconds). The retry policy determines whether performing the logging action should be re-attempted if an exception occurs. The order of operations is:
- The retry policy is applied.
- A connection is established using the provided Logstash context.
- The timeout is applied.
- The Logstash computation is executed.
If the computation is successful, each step will only be executed once. If an exception is raised by the computation or the timeout, the connection to the Logstash server is terminated and the exception propagated to the retry policy. If the retry policy determines that the computation should be re-attempted, steps 2-4 will happen again. The timeout applies to every attempt individually and should be chosen appropriately in conjunction with the retry policy in mind.
Depending on whether the Logstash context is a Acquire LogstashConnection value or a LogstashPool (Pool LogstashConnection) value, the runLogstash functions maps to one of:
- runLogstashConnfor- Acquire LogstashConnection(e.g. the result of- logstashTcpor- logstashTls).
- runLogstashPoolfor- Pool LogstashConnection(e.g. the result of- logstashTcpPoolor- logstashTlsPool). If a connection is available in the pool, that connection will be used. If no connection is available but there is an empty space in the pool, a new connection will be established. If neither is true, this function blocks until a connection is available. The computation that is provided as the second argument is then run with the connection. In the event of an exception, the connection is not returned to the pool.
Stashing things by hand
The following functions allow sending data synchronously via the Logstash connection:
- stashis a general-purpose function for sending- ByteStringdata to the server. No further processing is performed on the data.
- stashJsonLineis for use with the- json_linecodec. The argument is encoded as JSON and a- \ncharacter is appended, which is then sent to the server.
Any exception raised by the above stashing functions will likely be due to a bad connection. The runLogstash functions apply the retry policy before establishing a connection, so in the event that an exception is raised, a new connection will be established for the next attempt.
Asynchronously
The withLogstashQueue function is used for asynchronous logging. When called, it sets up a bounded queue that is then used to communicate log messages to worker threads which dispatch them to Logstash. A minimal example with default settings is shown below:
data Doc = Doc String
instance ToJSON Doc where
    toJSON (Doc msg) = object [ "message" .= msg ]
main :: IO ()
main = do
    let ctx = logstashTcp def
    let cfg = defaultLogstashQueueCfg ctx
    
    withLogstashQueue cfg (const stashJsonLine) [] $ \queue -> do
        atomically $ writeTBMQueue queue (Doc "Hello World")
The [] given to withLogstashQueue allows installing exception handlers that are used to handle the case where a log message has exhausted the retry policy. This can e.g. be used to fall back to the standard output for logging as a last resort and to stop the worker thread from getting terminated by an exception that may be recoverable.
The queue is automatically closed when the inner computation returns. The worker threads will continue running until the queue is empty and then terminate. withLogstashQueue will not return until all worker threads have returned.
Usage with monad-logger
The monad-logger-logstash package provides convenience functions and types for working with monad-logger.
Synchronous logging
The following example demonstrates how to use the runLogstashLoggingT function with a TCP connection to Logstash, the default retry policy from Control.Retry, a 1s timeout for each attempt, and the json_lines codec:
main :: IO ()
main = do 
    let ctx = logstashTcp def
    runLogstashLoggingT ctx retryPolicyDefault 1000000 (const stashJsonLine) $ 
        logInfoN "Hello World"
Each call to a logging function such as logInfoN in the example will result in the log message being written to Logstash synchronously.
Asynchronous logging
The withLogstashLoggingT function is the analogue of withLogstashQueue for monad-logger. It performs the same setup as withLogstashQueue, but automatically adds all log messages from logging functions to the queue. A minimal example with default settings is:
main :: IO ()
main = do 
    let ctx = logstashTcp def
    withLogstashLoggingT (defaultLogstashQueueCfg ctx) (const stashJsonLine) [] $ 
        logInfoN "Hello World"
While withLogstashLoggingT is useful for scenarios where there is a single producer for which log messages should be dispatched asynchronously, we may wish to share the same queue among several producers. For such applications, the runTBMQueueLoggingT in combination with withLogstashQueue is a better fit:
main :: IO ()
main = do 
    let ctx = logstashTcp def
    let cfg = defaultLogstashQueueCfg ctx
    withLogstashQueue cfg (const stashJsonLine) [] $ \queue -> do
        thread <- async $ runTBMQueueLoggingT queue $ do
            liftIO $ threadDelay (60*1000*1000)
            logInfoN "I am consumer #2" 
        
        runTBMQueueLoggingT queue $ do 
            logInfoN "I am consumer #1"
        wait thread
Usage with katip
The katip-logstash package provides convenience functions and types for working with katip.
Asynchronous logging
The withLogstashScribe function is the analogue of withLogstashQueue for katip. It performs the same setup as withLogstashQueue, but provides a Scribe instead of the raw queue. A minimal example with default settings is (adapted from the katip documentation):
main :: IO ()
main = do 
    let ctx = logstashTcp def
    withLogstashScribe (defaultLogstashQueueCfg ctx) (const $ pure True) (itemJson V3) (const stashJsonLine) [] $ \logstashScribe -> do
        let makeLogEnv = registerScribe "logstash" logstashScribe defaultScribeSettings =<< initLogEnv "MyApp" "production"
        bracket makeLogEnv closeScribes $ \le -> do
            let initialContext = ()
            let initialNamespace = "main"
            runKatipContextT le initialContext initialNamespace $ do
                $(logTM) InfoS "Hello World"
Changes
Changelog for logstash
v0.1.0.4
- Compatibility with GHC 9.6
v0.1.0.1
- Fixes a bug which caused LogstashConnections from aLogstashPoolto not get released properly inrunLogstashPool
v0.1
- First release with support for connecting to tcp(incl. TLS) Logstash inputs and thejson_linescodec.
