layout: post
title: “Messages and Agents”
description: “Making it easier to think about concurrency”
nav: why-use-fsharp
seriesId: “Why use F#?”
seriesOrder: 25

categories: [Concurrency]

In this post, we’ll look at the message-based (or actor-based) approach to concurrency.

In this approach, when one task wants to communicate with another, it sends it a message, rather than contacting it directly. The messages are put on a queue, and the receiving task (known as an “actor” or “agent”) pulls the messages off the queue one at a time to process them.

This message-based approach has been applied to many situations, from low-level network sockets (built on TCP/IP) to enterprise wide application integration systems (for example MSMQ or IBM WebSphere MQ).

From a software design point of view, a message-based approach has a number of benefits:

  • You can manage shared data and resources without locks.
  • You can easily follow the “single responsibility principle”, because each agent can be designed to do only one thing.
  • It encourages a “pipeline” model of programming with “producers” sending messages to decoupled “consumers”, which has additional benefits:
    • The queue acts as a buffer, eliminating waiting on the client side.
    • It is straightforward to scale up one side or the other of the queue as needed in order to maximize throughput.
    • Errors can be handled gracefully, because the decoupling means that agents can be created and destroyed without affecting their clients.

From a practical developer’s point of view, what I find most appealing about the message-based approach is that when writing the code for any given actor, you don’t have to hurt your brain by thinking about concurrency. The message queue forces a “serialization” of operations that otherwise might occur concurrently. And this in turn makes it much easier to think about (and write code for) the logic for processing a message, because you can be sure that your code will be isolated from other events that might interrupt your flow.

With these advantages, it is not surprising that when a team inside Ericsson wanted to design a programming language for writing highly-concurrent telephony applications, they created one with a message-based approach, namely Erlang. Erlang has now become the poster child for the whole topic, and has created a lot of interest in implementing the same approach in other languages.

How F# implements a message-based approach

F# has a built-in agent class called MailboxProcessor. These agents are very lightweight compared with threads - you can instantiate tens of thousands of them at the same time.

These are similar to the agents in Erlang, but unlike the Erlang ones, they do not work across process boundaries, only in the same process.
And unlike a heavyweight queueing system such as MSMQ, the messages are not persistent. If your app crashes, the messages are lost.

But these are minor issues, and can be worked around. In a future series, I will go into alternative implementations of message queues. The fundamental approach is the same in all cases.

Let’s see a simple agent implementation in F#:

  1. #nowarn "40"
  2. let printerAgent = MailboxProcessor.Start(fun inbox->
  3. // the message processing function
  4. let rec messageLoop = async{
  5. // read a message
  6. let! msg = inbox.Receive()
  7. // process a message
  8. printfn "message is: %s" msg
  9. // loop to top
  10. return! messageLoop
  11. }
  12. // start the loop
  13. messageLoop
  14. )

The MailboxProcessor.Start function takes a simple function parameter. That function loops forever, reading messages from the queue (or “inbox”) and processing them.

Note: I have added the #nowarn “40” pragma to avoid the warning “FS0040”, which can be safely ignored in this case.

Here’s the example in use:

  1. // test it
  2. printerAgent.Post "hello"
  3. printerAgent.Post "hello again"
  4. printerAgent.Post "hello a third time"

In the rest of this post we’ll look at two slightly more useful examples:

  • Managing shared state without locks
  • Serialized and buffered access to shared IO

In both of these cases, a message based approach to concurrency is elegant, efficient, and easy to program.

Managing shared state

Let’s look at the shared state problem first.

A common scenario is that you have some state that needs to be accessed and changed by multiple concurrent tasks or threads.
We’ll use a very simple case, and say that the requirements are:

  • A shared “counter” and “sum” that can be incremented by multiple tasks concurrently.
  • Changes to the counter and sum must be atomic — we must guarantee that they will both be updated at the same time.

The locking approach to shared state

Using locks or mutexes is a common solution for these requirements, so let’s write some code using a lock, and see how it performs.

First let’s write a static LockedCounter class that protects the state with locks.

  1. open System
  2. open System.Threading
  3. open System.Diagnostics
  4. // a utility function
  5. type Utility() =
  6. static let rand = new Random()
  7. static member RandomSleep() =
  8. let ms = rand.Next(1,10)
  9. Thread.Sleep ms
  10. // an implementation of a shared counter using locks
  11. type LockedCounter () =
  12. static let _lock = new Object()
  13. static let mutable count = 0
  14. static let mutable sum = 0
  15. static let updateState i =
  16. // increment the counters and...
  17. sum <- sum + i
  18. count <- count + 1
  19. printfn "Count is: %i. Sum is: %i" count sum
  20. // ...emulate a short delay
  21. Utility.RandomSleep()
  22. // public interface to hide the state
  23. static member Add i =
  24. // see how long a client has to wait
  25. let stopwatch = new Stopwatch()
  26. stopwatch.Start()
  27. // start lock. Same as C# lock{...}
  28. lock _lock (fun () ->
  29. // see how long the wait was
  30. stopwatch.Stop()
  31. printfn "Client waited %i" stopwatch.ElapsedMilliseconds
  32. // do the core logic
  33. updateState i
  34. )
  35. // release lock

Some notes on this code:

  • This code is written using a very imperative approach, with mutable variables and locks
  • The public Add method has explicit Monitor.Enter and Monitor.Exit expressions to get and release the lock. This is the same as the lock{...} statement in C#.
  • We’ve also added a stopwatch to measure how long a client has to wait to get the lock.
  • The core “business logic” is the updateState method, which not only updates the state, but adds a small random wait as well to emulate the time taken to do the processing.

Let’s test it in isolation:

  1. // test in isolation
  2. LockedCounter.Add 4
  3. LockedCounter.Add 5

Next, we’ll create a task that will try to access the counter:

  1. let makeCountingTask addFunction taskId = async {
  2. let name = sprintf "Task%i" taskId
  3. for i in [1..3] do
  4. addFunction i
  5. }
  6. // test in isolation
  7. let task = makeCountingTask LockedCounter.Add 1
  8. Async.RunSynchronously task

In this case, when there is no contention at all, the wait times are all 0.

But what happens when we create 10 child tasks that all try to access the counter at once:

  1. let lockedExample5 =
  2. [1..10]
  3. |> List.map (fun i -> makeCountingTask LockedCounter.Add i)
  4. |> Async.Parallel
  5. |> Async.RunSynchronously
  6. |> ignore

Oh dear! Most tasks are now waiting quite a while. If two tasks want to update the state at the same time, one must wait for the other’s work to complete before it can do its own work, which affects performance.

And if we add more and more tasks, the contention will increase, and the tasks will spend more and more time waiting rather than working.

The message-based approach to shared state

Let’s see how a message queue might help us. Here’s the message based version:

  1. type MessageBasedCounter () =
  2. static let updateState (count,sum) msg =
  3. // increment the counters and...
  4. let newSum = sum + msg
  5. let newCount = count + 1
  6. printfn "Count is: %i. Sum is: %i" newCount newSum
  7. // ...emulate a short delay
  8. Utility.RandomSleep()
  9. // return the new state
  10. (newCount,newSum)
  11. // create the agent
  12. static let agent = MailboxProcessor.Start(fun inbox ->
  13. // the message processing function
  14. let rec messageLoop oldState = async{
  15. // read a message
  16. let! msg = inbox.Receive()
  17. // do the core logic
  18. let newState = updateState oldState msg
  19. // loop to top
  20. return! messageLoop newState
  21. }
  22. // start the loop
  23. messageLoop (0,0)
  24. )
  25. // public interface to hide the implementation
  26. static member Add i = agent.Post i

Some notes on this code:

  • The core “business logic” is again in the updateState method, which has almost the same implementation as the earlier example, except the state is immutable, so that a new state is created and returned to the main loop.
  • The agent reads messages (simple ints in this case) and then calls updateState method
  • The public method Add posts a message to the agent, rather than calling the updateState method directly
  • This code is written in a more functional way; there are no mutable variables and no locks anywhere. In fact, there is no code dealing with concurrency at all!
    The code only has to focus on the business logic, and is consequently much easier to understand.

Let’s test it in isolation:

  1. // test in isolation
  2. MessageBasedCounter.Add 4
  3. MessageBasedCounter.Add 5

Next, we’ll reuse a task we defined earlier, but calling MessageBasedCounter.Add instead:

  1. let task = makeCountingTask MessageBasedCounter.Add 1
  2. Async.RunSynchronously task

Finally let’s create 5 child tasks that try to access the counter at once.

  1. let messageExample5 =
  2. [1..5]
  3. |> List.map (fun i -> makeCountingTask MessageBasedCounter.Add i)
  4. |> Async.Parallel
  5. |> Async.RunSynchronously
  6. |> ignore

We can’t measure the waiting time for the clients, because there is none!

Shared IO

A similar concurrency problem occurs when accessing a shared IO resource such as a file:

  • If the IO is slow, the clients can spend a lot of time waiting, even without locks.
  • If multiple threads write to the resource at the same time, you can get corrupted data.

Both problems can be solved by using asynchronous calls combined with buffering — exactly what a message queue does.

In this next example, we’ll consider the example of a logging service that many clients will write to concurrently.
(In this trivial case, we’ll just write directly to the Console.)

We’ll first look at an implementation without concurrency control, and then at an implementation that uses message queues to serialize all requests.

IO without serialization

In order to make the corruption very obvious and repeatable, let’s first create a “slow” console that writes each individual character in the log message
and pauses for a millisecond between each character. During that millisecond, another thread could be writing as well, causing an undesirable
interleaving of messages.

  1. let slowConsoleWrite msg =
  2. msg |> String.iter (fun ch->
  3. System.Threading.Thread.Sleep(1)
  4. System.Console.Write ch
  5. )
  6. // test in isolation
  7. slowConsoleWrite "abc"

Next, we will create a simple task that loops a few times, writing its name each time to the logger:

  1. let makeTask logger taskId = async {
  2. let name = sprintf "Task%i" taskId
  3. for i in [1..3] do
  4. let msg = sprintf "-%s:Loop%i-" name i
  5. logger msg
  6. }
  7. // test in isolation
  8. let task = makeTask slowConsoleWrite 1
  9. Async.RunSynchronously task

Next, we write a logging class that encapsulates access to the slow console. It has no locking or serialization, and is basically not thread-safe:

  1. type UnserializedLogger() =
  2. // interface
  3. member this.Log msg = slowConsoleWrite msg
  4. // test in isolation
  5. let unserializedLogger = UnserializedLogger()
  6. unserializedLogger.Log "hello"

Now let’s combine all these into a real example. We will create five child tasks and run them in parallel, all trying to write to the slow console.

  1. let unserializedExample =
  2. let logger = new UnserializedLogger()
  3. [1..5]
  4. |> List.map (fun i -> makeTask logger.Log i)
  5. |> Async.Parallel
  6. |> Async.RunSynchronously
  7. |> ignore

Ouch! The output is very garbled!

Serialized IO with messages

So what happens when we replace UnserializedLogger with a SerializedLogger class that encapsulates a message queue.

The agent inside SerializedLogger simply reads a message from its input queue and writes it to the slow console. Again there is no code dealing with concurrency and no locks are used.

  1. type SerializedLogger() =
  2. // create the mailbox processor
  3. let agent = MailboxProcessor.Start(fun inbox ->
  4. // the message processing function
  5. let rec messageLoop () = async{
  6. // read a message
  7. let! msg = inbox.Receive()
  8. // write it to the log
  9. slowConsoleWrite msg
  10. // loop to top
  11. return! messageLoop ()
  12. }
  13. // start the loop
  14. messageLoop ()
  15. )
  16. // public interface
  17. member this.Log msg = agent.Post msg
  18. // test in isolation
  19. let serializedLogger = SerializedLogger()
  20. serializedLogger.Log "hello"

So now we can repeat the earlier unserialized example but using the SerializedLogger instead. Again, we create five child tasks and run them in parallel:

  1. let serializedExample =
  2. let logger = new SerializedLogger()
  3. [1..5]
  4. |> List.map (fun i -> makeTask logger.Log i)
  5. |> Async.Parallel
  6. |> Async.RunSynchronously
  7. |> ignore

What a difference! This time the output is perfect.

Summary

There is much more to say about this message based approach. In a future series, I hope to go into much more detail, including discussion of topics such as:

  • alternative implementations of message queues with MSMQ and TPL Dataflow.
  • cancellation and out of band messages.
  • error handling and retries, and handling exceptions in general.
  • how to scale up and down by creating or removing child agents.
  • avoiding buffer overruns and detecting starvation or inactivity.