thread-supervisor
A simplified implementation of Erlang/OTP like supervisor over thread
https://github.com/nshimaza/thread-supervisor#readme
| LTS Haskell 24.16: | 0.2.0.0 |
| Stackage Nightly 2025-10-24: | 0.2.0.0 |
| Latest on Hackage: | 0.2.0.0 |
thread-supervisor-0.2.0.0@sha256:3a8c7f941c97bbf27660c530bf48c8baa78e48f6cda5c5efebea6d0e82250086,2037Module documentation for 0.2.0.0
- Control
- Control.Concurrent
- Data
thread-supervisor
A simplified implementation of Erlang/OTP like supervisor over thread.
Overview
This package provides Erlang/OTP like thread supervision. It provides automatic restart, escalation of intense crash, guaranteed cleanup of child threads on supervisor termination.
Motivation
Unlike Unix process, plain Haskell thread, created by forkIO, has no parent-child relation each other in its lifecycle management. This means termination of parent thread doesn’t result its children also terminated. This is good design as a low level API because it gives user greatest flexibility. However, it also means managing entire lifecycle of thread is totally a responsibility of user.
Here one thing you need to be aware. Garbage collection doesn’t work on living thread. When you lost reference to an object, garbage collector frees up the object for you. However, even though you lost the thread ID of your child thread, Haskell runtime doesn’t consider the thread is orphaned. The child thread continues running.
This is prone to create thread leakage. You can accidentally lose thread ID of child thread by crash of parent thread. Now you no longer have way to kill orphaned child thread. This is thread leakage.
The low level forkIO API requires you keep track and manage entire thread lifecycle including accidental case like the above. Hand crafting it might be painful.
This package is intended to provide better wrapper API over plain forkIO. Not just providing parent-child thread lifecycle management, this package provides Erlang/OTP like API so that user can leverage well proven practices from Erlang/OTP.
If you need to keep your child running after parent terminated, this package is not for you.
Why not withAsync?
In short, withAsync addresses different problem than this package.
withAsync: Accessing multiple REST server concurrently then gather all responses with guarantee of cancellation of all the request on termination of calling thread.thread-supervisor: Implementing server where unknown number of independent, concurrent, and indeterministic lifecycle requests will arrive.
A typical use case for this package is TCP server style use case. In such use case, you have to create unpredictable number of threads in order to serve to clients and those threads finish in random timings.
The withAsync coming with async package solves different problem than this
package. It is good for taking actions asynchronously but eventually you need
their return values. Or, even you aren’t care of return values, you only need
to take several finite number of actions concurrently.
Below is explanation why withAsync is not good for managing large number of
threads.
withAsync is essentially a sugar over bracket pattern like this.
withAsync action inner = bracket (async action) uninterruptibleCancel inner
It guarantees execution of uninterruptibleCancel to the action on
asynchronous exception occurrence at parent thread where withAsync itself is
living. However it also guarantees the uninterruptibleCancel is executed on
normal exit from inner too. Thus, the action can only live within the
lifecycle of the withAsync call. If you want to keep your action alive, you
have to keep inner continue running until your action finishes.
So, what if you kick async action go and make recursive call form inner back
to your loop? It is a bad idea. Because withAsync is a bracket, recursive
call from inner makes non-tail-recurse call. It consumes stack every time you
make recurring.
In other words, the difference between withAsync and thread-supervisor is
strategy of installing / un-installing cleanup handler. withAsync installs
cleanup handler on stack so it uninstalls handler based on its lexical scope.
thread-supervisor installs cleanup handler surrounding user supplied action so
it uninstalls handlers at actual dynamic thread termination.
Quick Start
High level steps to use
- Create a
MonitoredActionfrom your IO action - Create a
ChildSpecfrom theMonitoredAction - Let a supervisor run the
ChildSpecin a supervised thread
Detail will be different whether you create static child thread or dynamic child thread.
Create a static child
Static child is thread automatically spawned when supervisor starts. Following procedure makes your IO action a static child.
- Create a
MonitoredActionfrom your IO action - Create a
ChildSpecfrom theMonitoredAction - Give the
ChildSpectonewSupervisor - Run generated supervisor
Static children are automatically forked when supervisor started or one-for-all supervisor performed restarting action. When IO action inside of static child terminated, regardless normal completion or exception, supervisor checks if restart operation needed based on combination of restart type of terminated child and reason of termination. If supervisor decides restart is needed, it performs restarting operation based on its restart strategy, which can be one-for-one or one-for-all.
A supervisor can have any number of static children. Static children must be
given when supervisor is created by newSupervisor.
Static child example
Following code creates a supervisor actor with two static children and run it in new thread.
runYourSupervisorWithStaticChildren = do
Actor svQ svAction <- newActor . newSupervisor $ OneForAll def
[ newChildSpec Permanent yourIOAction1
, newChildSpec Permanent yourIOAction2
]
async svAction
The idiom newActor . newSupervisor returns Actor svQ svAction where svQ is
write-end of message queue for the supervisor actor, which we don’t use here,
and svAction is body IO action of the supervisor. When the svAction is
actually evaluated, it automatically forks two threads. One is for
yourIOAction1 and the other is for yourIOAction2. Because restart type of
given static children are both Permanent, the supervisor always kicks
restarting operation when one of yourIOAction1 or yourIOAction2 is
terminated. When restarting operation is kicked, the supervisor kills remaining
thread and restarts all children again because its restarting strategy is
one-for-all.
When the supervisor is terminated, both yourIOAction1 and yourIOAction2 are
automatically killed by the supervisor. To kill the supervisor, apply cancel
to the async object returned by async svAction.
Create a dynamic child
Dynamic child is thread explicitly forked via newChild function. Following
procedure runs your IO action as a dynamic child.
- Run a supervisor
- Create a
ChildSpecfrom your IO action - Request the supervisor to create a dynamic child based on the
ChildSpecby callingnewChild
Dynamic children are explicitly forked to each thread via newChild request to
running supervisor. Supervisor never restarts dynamic child. It ignores
restart type defined in ChildSpec of dynamic child.
Dynamic child example
Following code runs a supervisor in different thread then request it to run a dynamic child.
-- Run supervisor in another thread
Actor svQ svAction <- newActor $ newSimpleOneForOneSupervisor
asyncSv <- async svAction
-- Request to run your action under the supervisor
let yourChildSpec = newChildSpec Temporary yourIOAction
maybeChildThreadId <- newChild def svQ yourChildSpec
The idiom newActor $ newSimpleOneForOneSupervisor returns Actor svQ svAction
where svQ is write-end of message queue for the supervisor actor and
svAction is body IO action of the supervisor. When the svAction is actually
evaluated, it listens svQ and wait for request to run dynamic child.
When newChild is called with svQ, it sends request to the supervisor to run
a dynamic child with given ChildSpec.
When the supervisor is terminated, requested children are automatically killed by the supervisor if they are still running.
To kill the supervisor, apply cancel to asyncSv.
Building Blocks
This package consists of following building blocks.
- Actor and Message queue
- Monitored IO action and supervisable IO action
- Behaviors (state machine, server, and supervisor)
Actor and message queue is lowest layer block of this package. Behaviors are built upon this block. It is exposed to user so that you can use it for implementing actor style concurrent program.
Monitored IO action is the heart of this package. It implements most sensitive part of dealing with asynchronous exception. Monitored IO action provides guaranteed notification on thread termination so that supervisor can provide guaranteed supervision on threads.
Behaviors - state machine, server, and supervisor - implement simplified Erlang/OTP behaviors so that user can leverage best practice of concurrent programming from Erlang/OTP.
Actor and Message queue
Actor is restartable IO action with inbound message queue. Actor is designed to
allow other threads sending messages to an actor keep using the same write-end
of the queue before and after restart of the actor. Actor consists of message
queue and its handler. Inbox is a message queue designed for actor’s message
inbox. It is thread-safe, bounded or unbounded, and selectively readable queue.
To protect read-end of the queue, separate types are given to read-end and
write-end. Message handler of actor can access to both end but only write-end
is accessible from outside of message handler. To realize this, constructor of
message queue are not exposed. The only way to create a new Inbox object is
creating a new actor using newActor function.
newActor :: (Inbox message -> IO result) -> IO (Actor message result)
This package provides type synonym for message handler as below.
type ActorHandler message result = (Inbox message -> IO result)
newActor receives an user supplied message handler, creates a new Inbox
value, then returns write-end of actor’s message queue and IO action of the
actor’s body wrapped by Actor. Actor is defined as following.
data Actor message result = Actor
{ actorQueue :: ActorQ message -- ^ Write end of message queue of 'Actor'
, actorAction :: IO result -- ^ IO action to execute 'Actor'
}
The ActorQ message in the Actor is the write-end of created Inbox. While
user supplied message handler receives Inbox, which is read-end of created
queue, caller of newActor gets write-end only.
Message Queue
Inbox is specifically designed queue for implementing actor. All behaviors
available in this package depend on it. It provides following capabilities.
- Thread-safe read and write.
- Blocking and non-blocking read operation.
- Selective read operation.
- Current queue length.
- Bounded queue.
The type Inbox is intended to be used only for reading side as inbox of actor.
Single Inbox object is only readable from single actor. In order to avoid
reading from other actors, no constructors are exposed but instead you can get
it only via newActor or newBoundedActor.
Read an oldest message from Inbox
To read a message at the head of message queue, apply receive to Inbox. If
one or more message is available, receive returns oldest one. If no message
is available, receive blocks until at least one message arrives. A skeleton
of actor message handler will look like this.
myActorHandler :: Inbox YourMessageType -> IO ()
myActorHandler inbox = do
newMessage <- receive inbox
doSomethingWith newMessage
myActorHandler inbox
Send a message to an actor
To send a message to an actor, call send with write-end of the actor’s inbox
and the message.
send :: ActorQ message -> message -> IO ()
ActorQ is write-end of actor’s message queue. ActorQ is actually just a
wrapper of Inbox. Its role is hiding read-end API of Inbox. From outside
of actor, only write-end is exposed via ActorQ. From inside of actor, both
read-end and write-end are available. You can read from given inbox directly.
You can write to given inbox with sendToMe.
Send a message from an actor to itself
When you need to send a message from an actor to the actor itself, call
sendToMe.
sendToMe :: Inbox message -> message -> IO ()
Following code demonstrates how entire actor handler will look like.
myActorHandler :: Inbox YourMessageType -> IO ()
myActorHandler inbox = do
newMessage <- receive inbox
doSomethingWith newMessage
sendToMe inbox messageToMyself -- Send a message to itself.
myActorHandler inbox
Actor
Actor is IO action emulating Erlang’s actor. It has a dedicated Inbox and
processes incoming messages until reaching end state.
Actor is restartable without replacing message queue. When actor’s IO action crashed and restarted, the new execution of the IO action continue referring the same message queue. Thus, threads sending messages to the actor can continue using the same write-end of the queue.
newActor and newBoundedActor create an actor with new Inbox. It is the only
exposed way to create a new Inbox. This limitation is intended. It prevents
any code other than message handler of the actor from reading the inbox.
From perspective of outside of actor, user supplies an IO action with type
ActorHandler to newActor or newBoundedActor then user gets IO action of
created actor and write-end of message queue of the actor, which is ActorQ
type value.
From perspective of inside of actor, in other word, from perspective of user supplied message handler, it has a message queue both read and write side available.
Shared Inbox
You can run created actor multiple time simultaneously with different thread
each. In such case, each actor instances share single Inbox. This would be
useful to distribute task stream to multiple worker actor instances, however,
keep in mind there is no way to control which message is routed to what actor.
Monitored IO action
This package provides facility for supervising IO actions. With types and functions described in this section, you can run IO action with its own thread and receive notification on its termination at another thread with reason of termination. Functions in this section provides guaranteed supervision of your thread.
It looks something similar to UnliftIO.bracket. What distinguishes from
bracket is guaranteed work through entire lifetime of thread.
Use UnliftIO.bracket when you need guaranteed cleanup of resources acquired
within the same thread. It works as you expect. However, installing callback
for thread supervision using bracket (or UnliftIO.finally or even low level
UnliftIO.catch) within a thread has NO guarantee. There is a little window
where asynchronous exception is thrown after the thread is started but callback
is not yet installed. We will discuss this later in this section.
Notification is delivered via user supplied callback. Helper functions described in this section install your callback to your IO action. Then the callback will be called on termination of the IO action.
Important: Callback is called in terminated thread
Callback is called in terminated thread. You have to use inter-thread communication in order to notify to another thread.
User supplied callback receives ExitReason and UnliftIO.Concurrent.ThreadId
so that user can determine witch thread was terminated and why it was
terminated. In order to receive those parameters, user supplied callback must
have type signature Monitor, which is following.
ExitReason -> ThreadId -> IO ()
Function watch installs your callback to your plain IO action then returns
monitored action.
Callback can be nested. Use nestWatch to install another callback to already
monitored action.
Helper functions return IO action with signature MonitoredAction instead of
plain IO (). From here to the end of this section it will be a little
technical deep dive for describing why it has such signature.
The signature of MonitoredAction is this.
(IO () -> IO ()) -> IO ()
It requires an extra function argument. It is because MonitoredAction will be
invoked with UnliftIO.Concurrent.forkIOWithUnmask.
In order to ensure callback on termination works in any timing, the callback
must be installed under asynchronous exception masked. At the same time, in
order to allow killing the tread from another thread, body of IO action must be
executed under asynchronous exception unmasked. In order to satisfy both
conditions, the IO action and callback must be called using
UnliftIO.Concurrent.forkIOWithUnmask. Typically it looks like following.
mask_ $ forkIOWithUnmask $ \unmask -> unmask action `finally` callback
The extra function parameter in the signature of MonitoredAction is used for
accepting the @unmask@ function which is passed by
UnliftIO.Concurrent.forkIOWithUnmask. Functions defined in this section help
installing callback and converting type to fit to
UnliftIO.Concurrent.forkIOWithUnmask.
Child specification - supervisable process
ChildSpec is casting mold of child thread IO action which supervisor spawns
and manages. It is passed to supervisor, then supervisor let it run with its
own thread, monitor it, and restart it if needed. ChildSpec provides additional
attributes to MonitoredAction for controlling restart on thread termination.
That is Restart. Restart represents restart type concept came from
Erlang/OTP. The value of Restart defines how restart operation by supervisor
is triggered on termination of the thread. ChildSpec with Permanent restart
type triggers restart operation regardless its reason of termination. It
triggers restarting even by normal exit. Transient triggers restarting only
when the thread is terminated by exception. Temporary never triggers
restarting.
Refer to Erlang/OTP for more detail of restart type concept.
newMonitoredChildSpec creates a new ChildSpec from a MonitoredAction and a
restart type value. newChildSpec is short cut function creating a ChildSpec
from a plain IO action and a restart type value. addMonitor adds another
monitor to existing ChildSpec.
Behaviors
This package provides state machine, server, and supervisor behavior from Erlang/OTP with slight modifications.
All behaviors available in this package are defined as ActorHandler so that
they can be easily supervised by converting them to actor using newActor.
Server behavior is built upon state machine behavior. Supervisor is built on top of server behavior.
State Machine behavior
State machine behavior is most essential behavior in this package. It provides
framework for creating IO action of finite state machine running on its own
thread. State machine has single Inbox, its local state, and a user supplied
message handler. State machine is created with initial state value, waits for
incoming message, passes received message and current state to user supplied
handler, updates state to returned value from user supplied handler, stops or
continue to listen message queue based on what the handler returned.
To create a new state machine, prepare initial state of your state machine and
define your message handler driving your state machine, apply newStateMachine
to the initial state and handler. You will get a ActorHandler so you can get
an actor of the state machine by applying newActor to it.
Actor queue action <- newActor $ newStateMachine initialState handler
Or you can use short-cut helper.
Actor queue action <- newStateMachineActor initialState handler
The newStateMachine returns write-end of message queue for the state machine
and IO action to run. You can run the IO action by Control.Concurrent.forkIO
or Control.Concurrent.async, or you can let supervisor run it.
User supplied message handler must have following type signature.
handler :: (state -> message -> IO (Either result state))
When a message is sent to state machine’s queue, it is automatically received by
state machine framework, then the handler is called with current state and the
message. The handler must return either result or next state. When Left result is returned, the state machine stops and returned value of the IO action
is IO result. When Right state is returned, the state machine updates
current state with the returned state and wait for next incoming message.
Server behavior
Server behavior provides synchronous request-response style communication, a.k.a. ask pattern, with actor. Server behavior allows user to send a request to an actor then wait for response form the actor. This package provides a framework for implementing such actor.
Server behavior in this package is actually a set of helper functions and type synonym to help implementing ask pattern over actor. User need to follow some of rules described below to utilize those helpers.
Define ADT type for messages
First, user need to define an algebraic data type for message to the server in following form.
data myServerCommand
= ReqWithoutResp1
| ReqWithoutResp2 Arg1
| ReqWithoutResp3 Arg2 Arg3
| ReqWithResp1 (ServerCallback Result1)
| ReqWithResp1 ArgX (ServerCallback Result2)
| ReqWithResp2 ArgY ArgZ (ServerCallback Result3)
The rule is this:
- Define an ADT containing all requests.
- If a request doesn’t return response, define a value type for the request as usual element of sum type.
- If a request returns a response, put
(ServerCallback ResultType)at the last argument of the constructor for the request whereResultTypeis type of returned value.
ServerCallback is type synonym of a function type as following.
type ServerCallback a = (a -> IO ())
So real definition of your myServerCommand is:
data MyServerCommand
= ReqWithoutResp1
| ReqWithoutResp2 Arg1
| ReqWithoutResp3 Arg2 Arg3
| ReqWithResp1 (Result1 -> IO ())
| ReqWithResp2 ArgX (Result2 -> IO ())
| ReqWithResp3 ArgY ArgZ (Result3 -> IO ())
Define message handler
Next, user need to define an actor handling the message. In this example, we will use state machine behavior so that we can focus on core message handling part. For simplicity, this example doesn’t have internal state and it never finishes.
Define a state machine message handler handling myServerCommand.
myHandler :: () -> MyServerCommand -> IO (Either () ())
myHandler _ ReqWithoutResp1 = doJob1 $> Right ()
myHandler _ (ReqWithoutResp2 arg1) = doJob2 arg1 $> Right ()
myHandler _ (ReqWithoutResp3 arg2 arg3) = doJob3 arg2 arg3 $> Right ()
myHandler _ (ReqWithResp1 cont1) = (doJob4 >>= cont1) $> Right ()
myHandler _ (ReqWithResp2 argX cont2) = (doJob5 argX >>= cont2) $> Right ()
myHandler _ (ReqWithResp3 argY argZ cont3) = (doJob6 argY argZ >>= cont3) $> Right ()
The core idea here is implementing request handler in CPS style. If a request returns a response, the request message comes with callback function (a.k.a. continuation). You can send back response for the request by calling the callback.
Requesting to server
Function call, callAsync, and callIgnore are helper functions to implement
request-response communication with server. They install callback to message,
send the message, returns response to caller. They receive partially applied
server message constructor, apply it to callback function, then send it to
server. The installed callback handles response from the server. You can use
call like following.
maybeResult1 <- call def myServerActor ReqWithResp1
maybeResult2 <- call def myServerActor $ ReqWithResp2 argX
maybeResult3 <- call def myServerActor $ ReqWithResp3 argY argZ
When you send a request without response, use cast.
cast myServerActor ReqWithoutResp1
cast myServerActor $ ReqWithoutResp2 arg1
cast myServerActor $ ReqWithoutResp3 arg2 arg3
When you send a request with response but ignore it, use callIgnore.
callIgnore myServerActor ReqWithResp1
callIgnore myServerActor $ ReqWithResp2 argX
callIgnore myServerActor $ ReqWithResp3 argY argZ
Generally, ask pattern, or synchronous request-response communication is not
recommended in actor model. It is because synchronous request blocks entire
actor until it receives response or timeout. You can mitigate the situation by
wrapping the synchronous call with async. Use callAsync for such purpose.
Supervisor behavior
WIP
Supervisor behavior provides Erlang/OTP like thread supervision with some simplification.
Design Considerations
Separate role of threads
When you design thread hierarchy with this package, you have to follow design rule of Erlang/OTP where only supervisor can have children threads.
In Erlang/OTP, there are two type of Erlang process.
- Supervisor
- Worker
Supervisor has children processes and supervise them. Worker does real task but never has child process.
Without this rule, you have to have both supervision functionality and real task processing functionality within single process. That leads more complex implementation of process.
With this rule, worker no longer have to take care of supervising children. But at the same time you cannot create child process directly from worker.
Key Difference from Erlang/OTP Supervisor
- Mutable variables are shared
- Dynamic children are always
Temporary - No
shutdownmethod to terminate child - No
RestForOnestrategy - Every actor has dedicated Haskell thread
Mutable variables are shared
While “share nothing” is a key concept of Erlang, there is no such guarantee in this package. Message passed from one Haskell thread to another thread is shared between both threads. This isn’t a problem as long as message content is normal Haskell object. Normal Haskell object is immutable. Nobody mutates its value. So, in normal Haskell object, sharing is identical to copying.
However, when you pass mutable object like IORef, MVar, or TVar, do it with care. Those object can be mutated by other thread.
Dynamic children are always Temporary
Child thread created by newChild always created as Temporary child
regardless which restart type is designated in its spec. Temporary children
are never been restarted by supervisor. Permanent or Transient child must
be a part of ChildSpec list given to supervisor spec.
No shutdown method to terminate child
When supervisor terminates its children, supervisor always throw asynchronous
exception to children. There is no option like exit(Child, shutdown) found in
Erlang/OTP.
You must implement appropriate resource cleanup on asynchronous exception. You can implement graceful shutdown by yourself and it is always a good practice. However it does not arrow you escape from dealing with asynchronous exception. This package decided not to enforce you to implement graceful shutdown but leaves it your choice.
No RestForOne strategy
Only OneForOne and OneForAll restart strategy is supported.
Every actor has dedicated Haskell thread
Unlike some of other actor implementations, each actor in this package has its own Haskell thread. It means every actor has dedicated stack for each. Thus calling blocking API in middle of message handling does NOT prevent other actor running.
Some actor implementation give thread and stack to an actor only when it handles incoming message. In such implementation, actor has no thread and stack when it is waiting for next message. This maximizes scalability. Even though there are billions of actors, you only need n threads and stacks while you have n core micro processor.
A downside of such implementation is it strictly disallows blocking operation in middle of message handling. In such implementation, calling a blocking API in an actor system running with single thread causes stall of entire actor system until the blocking API returns.
That doesn’t happen in this package. Though you call any blocking API in middle of actor message handler, other Haskell threads continue running.
Giving dedicated thread to each actor requires giving dedicated stack frame to each actor too. It consumes more memory than the above design. However, in Haskell, it won’t be a serious problem. These are the reason why.
- In Haskell, size of stack frame starts from 1KB and grows as needed.
- It can be moved by GC so no continuous address space is required at beginning.
It is one of the greatest characteristic of GHC’s runtime. This package decided to leverage it.
Resource management
The word resource in this context means object kept in runtime but not garbage collected. For example, file handles, network sockets, and threads are resources. In Haskell, losing reference to those objects does NOT mean those objects will be closed or terminated. You have to explicitly close handles and sockets, terminate threads before you lose reference to them.
This becomes more complex under threaded GHC environment. Under GHC, thread can receive asynchronous exception in any timing. You have to cleanup resources when your thread received asynchronous exception as well as in case of normal exit and synchronous exception scenario.
This package does take care of threads managed by supervisor but you have to take care of any other resources.
Changes
Changelog for thread-supervisor
Version 0.2.0.0
- Breaking API change - hide ActorQ constructor.
- Introduce new functions - sendToMe and trySendToMe
Version 0.1.0.1
- Include .cabal file in source repo.
- Migrate CI from Travis to GitHub Actions.
- Build and test for multiple platforms and GHC versions.
Version 0.1.0.0
- First release