Single process pub-sub

The previous example was admittedly quite simple. Let’s build on that foundation (pun intended) to do something a bit more interesting. Suppose we have a workflow on our site like the following:

  1. Enter some information on page X, and submit.

  2. Submission starts a background job, and the user is redirected to a page to view status of that job.

  3. That second page will subscribe to updates from the background job and display them to the user.

The core principle here is the ability to let one thread publish updates, and have another thread subscribe to receive those updates. This is known generally as pub/sub, and fortunately is very easy to achieve in Haskell via STM.

Like the previous chapter, let me start off with the caveat: this technique only works properly if you have a single web application process. If you have two different servers and a load balancer, you’d either need sticky sessions or some other solution to make sure that the requests from a single user are going to the same machine. In those situations, you may want to consider using an external pubsub solution, such as Redis.

With that caveat out of the way, let’s get started.

Foundation datatype

We’ll need two different mutable references in our foundation. The first will keep track of the next “job id” we’ll hand out. Each of these background jobs we’ll be represented by a unique identifier, which will be used in our URLs. The second piece of data will be a map from the job ID to the broadcast channel used for publishing updates. In code:

  1. data App = App
  2. { jobs :: TVar (IntMap (TChan (Maybe Text)))
  3. , nextJob :: TVar Int
  4. }

Notice that our TChan contains Maybe Text values. The reason for the Maybe wrapper is so that we can indicate that the channel is complete, by providing a Nothing value.

Allocating a job

In order to allocate a job, we need to:

  1. Get a job ID.

  2. Create a new broadcast channel.

  3. Add the channel to the channel map.

Due to the beauty of STM, this is pretty easy.

  1. (jobId, chan) <- liftIO $ atomically $ do
  2. jobId <- readTVar nextJob
  3. writeTVar nextJob $! jobId + 1
  4. chan <- newBroadcastTChan
  5. m <- readTVar jobs
  6. writeTVar jobs $ IntMap.insert jobId chan m
  7. return (jobId, chan)

Fork our background job

There are many different ways we could go about this, and they depend entirely on what the background job is going to be. Here’s a minimal example of a background job that prints out a few messages, with a 1 second delay between each message. Note how after our final message, we broadcast a Nothing value and remove our channel from the map of channels.

  1. liftIO $ forkIO $ do
  2. threadDelay 1000000
  3. atomically $ writeTChan chan $ Just "Did something\n"
  4. threadDelay 1000000
  5. atomically $ writeTChan chan $ Just "Did something else\n"
  6. threadDelay 1000000
  7. atomically $ do
  8. writeTChan chan $ Just "All done\n"
  9. writeTChan chan Nothing
  10. m <- readTVar jobs
  11. writeTVar jobs $ IntMap.delete jobId m

View progress

For this demonstration, I’ve elected for a very simple progress viewing: a plain text page with stream response. There are a few other possibilities here: an HTML page that auto-refreshes every X seconds or using eventsource or websockets. I encourage you to give those a shot also, but here’s the simplest implementation I can think of:

  1. getViewProgressR jobId = do
  2. App {..} <- getYesod
  3. mchan <- liftIO $ atomically $ do
  4. m <- readTVar jobs
  5. case IntMap.lookup jobId m of
  6. Nothing -> return Nothing
  7. Just chan -> fmap Just $ dupTChan chan
  8. case mchan of
  9. Nothing -> notFound
  10. Just chan -> respondSource typePlain $ do
  11. let loop = do
  12. mtext <- liftIO $ atomically $ readTChan chan
  13. case mtext of
  14. Nothing -> return ()
  15. Just text -> do
  16. sendChunkText text
  17. sendFlush
  18. loop
  19. loop

We start off by looking up the channel in the map. If we can’t find it, it means the job either never existed, or has already been completed. In either event, we return a 404. (Another possible enhancement would be to store some information on all previously completed jobs and let the user know if they’re done.)

Assuming the channel exists, we use respondSource to start a streaming response. We then repeatedly call readTChan until we get a Nothing value, at which point we exit (via return ()). Notice that on each iteration, we call both sendChunkText and sendFlush. Without that second call, the user won’t receive any updates until the output buffer completely fills up, which is not what we want for a real-time update system.

Complete application

For completeness, here’s the full source code for this application:

  1. {-# LANGUAGE OverloadedStrings #-}
  2. {-# LANGUAGE QuasiQuotes #-}
  3. {-# LANGUAGE RecordWildCards #-}
  4. {-# LANGUAGE TemplateHaskell #-}
  5. {-# LANGUAGE TypeFamilies #-}
  6. import Control.Concurrent (forkIO, threadDelay)
  7. import Control.Concurrent.STM
  8. import Data.IntMap (IntMap)
  9. import qualified Data.IntMap as IntMap
  10. import Data.Text (Text)
  11. import Yesod
  12. data App = App
  13. { jobs :: TVar (IntMap (TChan (Maybe Text)))
  14. , nextJob :: TVar Int
  15. }
  16. mkYesod "App" [parseRoutes|
  17. / HomeR GET POST
  18. /view-progress/#Int ViewProgressR GET
  19. |]
  20. instance Yesod App
  21. getHomeR :: Handler Html
  22. getHomeR = defaultLayout $ do
  23. setTitle "PubSub example"
  24. [whamlet|
  25. <form method=post>
  26. <button>Start new background job
  27. |]
  28. postHomeR :: Handler ()
  29. postHomeR = do
  30. App {..} <- getYesod
  31. (jobId, chan) <- liftIO $ atomically $ do
  32. jobId <- readTVar nextJob
  33. writeTVar nextJob $! jobId + 1
  34. chan <- newBroadcastTChan
  35. m <- readTVar jobs
  36. writeTVar jobs $ IntMap.insert jobId chan m
  37. return (jobId, chan)
  38. liftIO $ forkIO $ do
  39. threadDelay 1000000
  40. atomically $ writeTChan chan $ Just "Did something\n"
  41. threadDelay 1000000
  42. atomically $ writeTChan chan $ Just "Did something else\n"
  43. threadDelay 1000000
  44. atomically $ do
  45. writeTChan chan $ Just "All done\n"
  46. writeTChan chan Nothing
  47. m <- readTVar jobs
  48. writeTVar jobs $ IntMap.delete jobId m
  49. redirect $ ViewProgressR jobId
  50. getViewProgressR :: Int -> Handler TypedContent
  51. getViewProgressR jobId = do
  52. App {..} <- getYesod
  53. mchan <- liftIO $ atomically $ do
  54. m <- readTVar jobs
  55. case IntMap.lookup jobId m of
  56. Nothing -> return Nothing
  57. Just chan -> fmap Just $ dupTChan chan
  58. case mchan of
  59. Nothing -> notFound
  60. Just chan -> respondSource typePlain $ do
  61. let loop = do
  62. mtext <- liftIO $ atomically $ readTChan chan
  63. case mtext of
  64. Nothing -> return ()
  65. Just text -> do
  66. sendChunkText text
  67. sendFlush
  68. loop
  69. loop
  70. main :: IO ()
  71. main = do
  72. jobs <- newTVarIO IntMap.empty
  73. nextJob <- newTVarIO 1
  74. warp 3000 App {..}