layout: post
title: “Functional Reactive Programming”
description: “Turning events into streams”
nav: why-use-fsharp
seriesId: “Why use F#?”
seriesOrder: 26

categories: [Concurrency]

Events are everywhere. Almost every program has to handle events, whether it be button clicks in the user interface, listening to sockets in a server, or even a system shutdown notification.

And events are the basis of one of the most common OO design patterns: the “Observer” pattern.

But as we know, event handling, like concurrency in general, can be tricky to implement. Simple event logic is straightforward, but what about logic like “do something if two events happen in a row but do something different if only one event happens” or “do something if two events happen at roughly the same time”. And how easy is it to combine these requirements in other, more complex ways?

Even you can successfully implement these requirements, the code tends to be spaghetti like and hard to understand, even with the best intentions.

Is there a approach that can make event handling easier?

We saw in the previous post on message queues that one of the advantages of that approach was that the requests were “serialized” making it conceptually easier to deal with.

There is a similar approach that can be used with events. The idea is to turn a series of events into an “event stream”.
Event streams then become quite like IEnumerables, and so the obvious next step
is to treat them in much the the same way that LINQ handles collections, so that they can be filtered, mapped, split and combined.

F# has built in support for this model, as well as for the more tradition approach.

A simple event stream

Let’s start with a simple example to compare the two approaches. We’ll implement the classic event handler approach first.

First, we define a utility function that will:

  • create a timer
  • register a handler for the Elapsed event
  • run the timer for five seconds and then stop it

Here’s the code:

  1. open System
  2. open System.Threading
  3. /// create a timer and register an event handler,
  4. /// then run the timer for five seconds
  5. let createTimer timerInterval eventHandler =
  6. // setup a timer
  7. let timer = new System.Timers.Timer(float timerInterval)
  8. timer.AutoReset <- true
  9. // add an event handler
  10. timer.Elapsed.Add eventHandler
  11. // return an async task
  12. async {
  13. // start timer...
  14. timer.Start()
  15. // ...run for five seconds...
  16. do! Async.Sleep 5000
  17. // ... and stop
  18. timer.Stop()
  19. }

Now test it interactively:

  1. // create a handler. The event args are ignored
  2. let basicHandler _ = printfn "tick %A" DateTime.Now
  3. // register the handler
  4. let basicTimer1 = createTimer 1000 basicHandler
  5. // run the task now
  6. Async.RunSynchronously basicTimer1

Now let’s create a similar utility method to create a timer, but this time it will return an “observable” as well, which is the stream of events.

  1. let createTimerAndObservable timerInterval =
  2. // setup a timer
  3. let timer = new System.Timers.Timer(float timerInterval)
  4. timer.AutoReset <- true
  5. // events are automatically IObservable
  6. let observable = timer.Elapsed
  7. // return an async task
  8. let task = async {
  9. timer.Start()
  10. do! Async.Sleep 5000
  11. timer.Stop()
  12. }
  13. // return a async task and the observable
  14. (task,observable)

And again test it interactively:

  1. // create the timer and the corresponding observable
  2. let basicTimer2 , timerEventStream = createTimerAndObservable 1000
  3. // register that everytime something happens on the
  4. // event stream, print the time.
  5. timerEventStream
  6. |> Observable.subscribe (fun _ -> printfn "tick %A" DateTime.Now)
  7. // run the task now
  8. Async.RunSynchronously basicTimer2

The difference is that instead of registering a handler directly with an event,
we are “subscribing” to an event stream. Subtly different, and important.

Counting events

In this next example, we’ll have a slightly more complex requirement:

  1. Create a timer that ticks every 500ms.
  2. At each tick, print the number of ticks so far and the current time.

To do this in a classic imperative way, we would probably create a class with a mutable counter, as below:

  1. type ImperativeTimerCount() =
  2. let mutable count = 0
  3. // the event handler. The event args are ignored
  4. member this.handleEvent _ =
  5. count <- count + 1
  6. printfn "timer ticked with count %i" count

We can reuse the utility functions we created earlier to test it:

  1. // create a handler class
  2. let handler = new ImperativeTimerCount()
  3. // register the handler method
  4. let timerCount1 = createTimer 500 handler.handleEvent
  5. // run the task now
  6. Async.RunSynchronously timerCount1

Let’s see how we would do this same thing in a functional way:

  1. // create the timer and the corresponding observable
  2. let timerCount2, timerEventStream = createTimerAndObservable 500
  3. // set up the transformations on the event stream
  4. timerEventStream
  5. |> Observable.scan (fun count _ -> count + 1) 0
  6. |> Observable.subscribe (fun count -> printfn "timer ticked with count %i" count)
  7. // run the task now
  8. Async.RunSynchronously timerCount2

Here we see how you can build up layers of event transformations, just as you do with list transformations in LINQ.

The first transformation is scan, which accumulates state for each event. It is roughly equivalent to the List.fold function that we have seen used with lists.
In this case, the accumulated state is just a counter.

And then, for each event, the count is printed out.

Note that in this functional approach, we didn’t have any mutable state, and we didn’t need to create any special classes.

Merging multiple event streams

For a final example, we’ll look at merging multiple event streams.

Let’s make a requirement based on the well-known “FizzBuzz” problem:

  1. Create two timers, called '3' and '5'. The '3' timer ticks every 300ms and the '5' timer ticks
  2. every 500ms.
  3. Handle the events as follows:
  4. a) for all events, print the id of the time and the time
  5. b) when a tick is simultaneous with a previous tick, print 'FizzBuzz'
  6. otherwise:
  7. c) when the '3' timer ticks on its own, print 'Fizz'
  8. d) when the '5' timer ticks on its own, print 'Buzz'

First let’s create some code that both implementations can use.

We’ll want a generic event type that captures the timer id and the time of the tick.

  1. type FizzBuzzEvent = {label:int; time: DateTime}

And then we need a utility function to see if two events are simultaneous. We’ll be generous and allow a time difference of up to 50ms.

  1. let areSimultaneous (earlierEvent,laterEvent) =
  2. let {label=_;time=t1} = earlierEvent
  3. let {label=_;time=t2} = laterEvent
  4. t2.Subtract(t1).Milliseconds < 50

In the imperative design, we’ll need to keep track of the previous event, so we can compare them.
And we’ll need special case code for the first time, when the previous event doesn’t exist

  1. type ImperativeFizzBuzzHandler() =
  2. let mutable previousEvent: FizzBuzzEvent option = None
  3. let printEvent thisEvent =
  4. let {label=id; time=t} = thisEvent
  5. printf "[%i] %i.%03i " id t.Second t.Millisecond
  6. let simultaneous = previousEvent.IsSome && areSimultaneous (previousEvent.Value,thisEvent)
  7. if simultaneous then printfn "FizzBuzz"
  8. elif id = 3 then printfn "Fizz"
  9. elif id = 5 then printfn "Buzz"
  10. member this.handleEvent3 eventArgs =
  11. let event = {label=3; time=DateTime.Now}
  12. printEvent event
  13. previousEvent <- Some event
  14. member this.handleEvent5 eventArgs =
  15. let event = {label=5; time=DateTime.Now}
  16. printEvent event
  17. previousEvent <- Some event

Now the code is beginning to get ugly fast! Already we have mutable state, complex conditional logic, and special cases, just for such a simple requirement.

Let’s test it:

  1. // create the class
  2. let handler = new ImperativeFizzBuzzHandler()
  3. // create the two timers and register the two handlers
  4. let timer3 = createTimer 300 handler.handleEvent3
  5. let timer5 = createTimer 500 handler.handleEvent5
  6. // run the two timers at the same time
  7. [timer3;timer5]
  8. |> Async.Parallel
  9. |> Async.RunSynchronously

It does work, but are you sure the code is not buggy? Are you likely to accidentally break something if you change it?

The problem with this imperative code is that it has a lot of noise that obscures the the requirements.

Can the functional version do better? Let’s see!

First, we create two event streams, one for each timer:

  1. let timer3, timerEventStream3 = createTimerAndObservable 300
  2. let timer5, timerEventStream5 = createTimerAndObservable 500

Next, we convert each event on the “raw” event streams into our FizzBuzz event type:

  1. // convert the time events into FizzBuzz events with the appropriate id
  2. let eventStream3 =
  3. timerEventStream3
  4. |> Observable.map (fun _ -> {label=3; time=DateTime.Now})
  5. let eventStream5 =
  6. timerEventStream5
  7. |> Observable.map (fun _ -> {label=5; time=DateTime.Now})

Now, to see if two events are simultaneous, we need to compare them from the two different streams somehow.

It’s actually easier than it sounds, because we can:

  • combine the two streams into a single stream:
  • then create pairs of sequential events
  • then test the pairs to see if they are simultaneous
  • then split the input stream into two new output streams based on that test

Here’s the actual code to do this:

  1. // combine the two streams
  2. let combinedStream =
  3. Observable.merge eventStream3 eventStream5
  4. // make pairs of events
  5. let pairwiseStream =
  6. combinedStream |> Observable.pairwise
  7. // split the stream based on whether the pairs are simultaneous
  8. let simultaneousStream, nonSimultaneousStream =
  9. pairwiseStream |> Observable.partition areSimultaneous

Finally, we can split the nonSimultaneousStream again, based on the event id:

  1. // split the non-simultaneous stream based on the id
  2. let fizzStream, buzzStream =
  3. nonSimultaneousStream
  4. // convert pair of events to the first event
  5. |> Observable.map (fun (ev1,_) -> ev1)
  6. // split on whether the event id is three
  7. |> Observable.partition (fun {label=id} -> id=3)

Let’s review so far. We have started with the two original event streams and from them created four new ones:

  • combinedStream contains all the events
  • simultaneousStream contains only the simultaneous events
  • fizzStream contains only the non-simultaneous events with id=3
  • buzzStream contains only the non-simultaneous events with id=5

Now all we need to do is attach behavior to each stream:

  1. //print events from the combinedStream
  2. combinedStream
  3. |> Observable.subscribe (fun {label=id;time=t} ->
  4. printf "[%i] %i.%03i " id t.Second t.Millisecond)
  5. //print events from the simultaneous stream
  6. simultaneousStream
  7. |> Observable.subscribe (fun _ -> printfn "FizzBuzz")
  8. //print events from the nonSimultaneous streams
  9. fizzStream
  10. |> Observable.subscribe (fun _ -> printfn "Fizz")
  11. buzzStream
  12. |> Observable.subscribe (fun _ -> printfn "Buzz")

Let’s test it:

  1. // run the two timers at the same time
  2. [timer3;timer5]
  3. |> Async.Parallel
  4. |> Async.RunSynchronously

Here’s all the code in one complete set:

  1. // create the event streams and raw observables
  2. let timer3, timerEventStream3 = createTimerAndObservable 300
  3. let timer5, timerEventStream5 = createTimerAndObservable 500
  4. // convert the time events into FizzBuzz events with the appropriate id
  5. let eventStream3 = timerEventStream3
  6. |> Observable.map (fun _ -> {label=3; time=DateTime.Now})
  7. let eventStream5 = timerEventStream5
  8. |> Observable.map (fun _ -> {label=5; time=DateTime.Now})
  9. // combine the two streams
  10. let combinedStream =
  11. Observable.merge eventStream3 eventStream5
  12. // make pairs of events
  13. let pairwiseStream =
  14. combinedStream |> Observable.pairwise
  15. // split the stream based on whether the pairs are simultaneous
  16. let simultaneousStream, nonSimultaneousStream =
  17. pairwiseStream |> Observable.partition areSimultaneous
  18. // split the non-simultaneous stream based on the id
  19. let fizzStream, buzzStream =
  20. nonSimultaneousStream
  21. // convert pair of events to the first event
  22. |> Observable.map (fun (ev1,_) -> ev1)
  23. // split on whether the event id is three
  24. |> Observable.partition (fun {label=id} -> id=3)
  25. //print events from the combinedStream
  26. combinedStream
  27. |> Observable.subscribe (fun {label=id;time=t} ->
  28. printf "[%i] %i.%03i " id t.Second t.Millisecond)
  29. //print events from the simultaneous stream
  30. simultaneousStream
  31. |> Observable.subscribe (fun _ -> printfn "FizzBuzz")
  32. //print events from the nonSimultaneous streams
  33. fizzStream
  34. |> Observable.subscribe (fun _ -> printfn "Fizz")
  35. buzzStream
  36. |> Observable.subscribe (fun _ -> printfn "Buzz")
  37. // run the two timers at the same time
  38. [timer3;timer5]
  39. |> Async.Parallel
  40. |> Async.RunSynchronously

The code might seem a bit long winded, but this kind of incremental, step-wise approach is very clear and self-documenting.

Some of the benefits of this style are:

  • I can see that it meets the requirements just by looking at it, without even running it. Not so with the imperative version.
  • From a design point of view, each final “output” stream follows the single responsibility principle — it only does one thing — so it is very easy to
    associate behavior with it.
  • This code has no conditionals, no mutable state, no edge cases. It would be easy to maintain or change, I hope.
  • It is easy to debug. For example, I could easily “tap” the output of the simultaneousStream to see if it
    contains what I think it contains:
  1. // debugging code
  2. //simultaneousStream |> Observable.subscribe (fun e -> printfn "sim %A" e)
  3. //nonSimultaneousStream |> Observable.subscribe (fun e -> printfn "non-sim %A" e)

This would be much harder in the imperative version.

Summary

Functional Reactive Programming (known as FRP) is a big topic, and we’ve only just touched on it here. I hope this introduction has given you a glimpse of the usefulness of this way of doing things.

If you want to learn more, see the documentation for the F# Observable module, which has the basic transformations used above.
And there is also the Reactive Extensions (Rx) library which shipped as part of .NET 4. That contains many other additional transformations.