CQRS Component

Golang CQRS implementation in Watermill.

CQRS

CQRS means “Command-query responsibility segregation”. We segregate the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are handled by different objects.

That’s it. We can further split up the data storage, having separate read and write stores. Once that happens, there may be many read stores, optimized for handling different types of queries or spanning many bounded contexts. Though separate read/write stores are often discussed in relation with CQRS, this is not CQRS itself. CQRS is just the first split of commands and queries.

Source: www.cqrs.nu FAQ

Glossary

CQRS Schema

Command

The command is a simple data structure, representing the request for executing some operation.

Command Bus

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

  1. // ...
  2. // CommandBus transports commands to command handlers.
  3. type CommandBus struct {
  4. // ...

Command Processor

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

  1. // ...
  2. // CommandProcessor determines which CommandHandler should handle the command received from the command bus.
  3. type CommandProcessor struct {
  4. // ...

Command Handler

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

  1. // ...
  2. // CommandHandler receives a command defined by NewCommand and handles it with the Handle method.
  3. // If using DDD, CommandHandler may modify and persist the aggregate.
  4. //
  5. // In contrast to EvenHandler, every Command must have only one CommandHandler.
  6. type CommandHandler interface {
  7. // ...

Event

The event represents something that already took place. Events are immutable.

Event Bus

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

  1. // ...
  2. // EventBus transports events to event handlers.
  3. type EventBus struct {
  4. // ...

Event Processor

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

  1. // ...
  2. // EventProcessor determines which EventHandler should handle event received from event bus.
  3. type EventProcessor struct {
  4. // ...

Event Handler

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

  1. // ...
  2. // EventHandler receives events defined by NewEvent and handles them with its Handle method.
  3. // If using DDD, CommandHandler may modify and persist the aggregate.
  4. // It can also invoke a process manager, a saga or just build a read model.
  5. //
  6. // In contrast to CommandHandler, every Event can have multiple EventHandlers.
  7. type EventHandler interface {
  8. // ...

CQRS Facade

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/cqrs.go

  1. // ...
  2. // Facade is a facade for creating the Command and Event buses and processors.
  3. // It was created to avoid boilerplate, when using CQRS in the standard way.
  4. // You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.
  5. type Facade struct {
  6. // ...

Command and Event Marshaler

Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

  1. // ...
  2. // CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa.
  3. // Payload of the command needs to be marshaled to []bytes.
  4. type CommandEventMarshaler interface {
  5. // Marshal marshals Command or Event to Watermill's message.
  6. Marshal(v interface{}) (*message.Message, error)
  7. // Unmarshal unmarshals watermill's message to v Command or Event.
  8. Unmarshal(msg *message.Message, v interface{}) (err error)
  9. // Name returns the name of Command or Event.
  10. // Name is used to determine, that received command or event is event which we want to handle.
  11. Name(v interface{}) string
  12. // NameFromMessage return the name of Command or Event from Watermill's message (generated by Marshal).
  13. //
  14. // When we have Commnad or Event marshaled to Watermill's message,
  15. // we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling.
  16. NameFromMessage(msg *message.Message) string
  17. }
  18. // ...

Usage

Example domain

As an example, we will use a simple domain, that is responsible for handing room booking in a hotel.

We will use Event Storming notation to show the model of this domain.

Legend:

  • blue post-its are commands
  • orange post-its are events
  • green post-its are read models, asynchronously generated from events
  • violet post-its are policies, which are triggered by events and produce commands
  • pink post its are hot-spots; we mark places where problems often occurCQRS Event Storming

The domain is simple:

  • A Guest is able to book a room.
  • Whenever a room is booked, we order a beer for the guest (because we love our guests).
    • We know that sometimes there are not enough beers.
  • We generate a financial report based on the bookings.

Sending a command

For the beginning, we need to simulate the guest’s action.

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

  1. // ...
  2. bookRoomCmd := &BookRoom{
  3. RoomId: fmt.Sprintf("%d", i),
  4. GuestName: "John",
  5. StartDate: startDate,
  6. EndDate: endDate,
  7. }
  8. if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
  9. panic(err)
  10. }
  11. // ...

Command handler

BookRoomHandler will handle our command.

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

  1. // ...
  2. // BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
  3. //
  4. // In CQRS, one command must be handled by only one handler.
  5. // When another handler with this command is added to command processor, error will be retuerned.
  6. type BookRoomHandler struct {
  7. eventBus *cqrs.EventBus
  8. }
  9. func (b BookRoomHandler) HandlerName() string {
  10. return "BookRoomHandler"
  11. }
  12. // NewCommand returns type of command which this handle should handle. It must be a pointer.
  13. func (b BookRoomHandler) NewCommand() interface{} {
  14. return &BookRoom{}
  15. }
  16. func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
  17. // c is always the type returned by `NewCommand`, so casting is always safe
  18. cmd := c.(*BookRoom)
  19. // some random price, in production you probably will calculate in wiser way
  20. price := (rand.Int63n(40) + 1) * 10
  21. log.Printf(
  22. "Booked %s for %s from %s to %s",
  23. cmd.RoomId,
  24. cmd.GuestName,
  25. time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
  26. time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
  27. )
  28. // RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
  29. // in future RoomBooked may be handled by multiple event handler
  30. if err := b.eventBus.Publish(ctx, &RoomBooked{
  31. ReservationId: watermill.NewUUID(),
  32. RoomId: cmd.RoomId,
  33. GuestName: cmd.GuestName,
  34. Price: price,
  35. StartDate: cmd.StartDate,
  36. EndDate: cmd.EndDate,
  37. }); err != nil {
  38. return err
  39. }
  40. return nil
  41. }
  42. // OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
  43. // ...

Event handler

As mentioned before, we want to order a beer every time when a room is booked (“Whenever a Room is booked” post-it). We do it by using the OrderBeer command.

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

  1. // ...
  2. // OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
  3. type OrderBeerOnRoomBooked struct {
  4. commandBus *cqrs.CommandBus
  5. }
  6. func (o OrderBeerOnRoomBooked) HandlerName() string {
  7. // this name is passed to EventsSubscriberConstructor and used to generate queue name
  8. return "OrderBeerOnRoomBooked"
  9. }
  10. func (OrderBeerOnRoomBooked) NewEvent() interface{} {
  11. return &RoomBooked{}
  12. }
  13. func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
  14. event := e.(*RoomBooked)
  15. orderBeerCmd := &OrderBeer{
  16. RoomId: event.RoomId,
  17. Count: rand.Int63n(10) + 1,
  18. }
  19. return o.commandBus.Send(ctx, orderBeerCmd)
  20. }
  21. // OrderBeerHandler is a command handler, which handles OrderBeer command and emits BeerOrdered.
  22. // ...

OrderBeerHandler is very similar to BookRoomHandler. The only difference is, that it sometimes returns an error when there are not enough beers, which causes redelivery of the command.You can find the entire implementation in the example source code.

Building a read model with the event handler

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

  1. // ...
  2. // BookingsFinancialReport is a read model, which calculates how much money we may earn from bookings.
  3. // Like OrderBeerOnRoomBooked, it listens for RoomBooked event.
  4. //
  5. // This implementation is just writing to the memory. In production, you will probably will use some persistent storage.
  6. type BookingsFinancialReport struct {
  7. handledBookings map[string]struct{}
  8. totalCharge int64
  9. lock sync.Mutex
  10. }
  11. func NewBookingsFinancialReport() *BookingsFinancialReport {
  12. return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
  13. }
  14. func (b BookingsFinancialReport) HandlerName() string {
  15. // this name is passed to EventsSubscriberConstructor and used to generate queue name
  16. return "BookingsFinancialReport"
  17. }
  18. func (BookingsFinancialReport) NewEvent() interface{} {
  19. return &RoomBooked{}
  20. }
  21. func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
  22. // Handle may be called concurrently, so it need to be thread safe.
  23. b.lock.Lock()
  24. defer b.lock.Unlock()
  25. event := e.(*RoomBooked)
  26. // When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
  27. // GoChannel Pub/Sub provides exactly-once delivery,
  28. // but let's make this example ready for other Pub/Sub implementations.
  29. if _, ok := b.handledBookings[event.ReservationId]; ok {
  30. return nil
  31. }
  32. b.handledBookings[event.ReservationId] = struct{}{}
  33. b.totalCharge += event.Price
  34. fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)
  35. return nil
  36. }
  37. var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
  38. func main() {
  39. // ...

Wiring it up - the CQRS facade

We have all the blocks to build our CQRS application. We now need to use some kind of glue to wire it up.

We will use the simplest in-memory messaging infrastructure: GoChannel.

Under the hood, CQRS is using Watermill’s message router. If you are not familiar with it and want to learn how it works, you should check Getting Started guide.It will also show you how to use some standard messaging patterns, like metrics, poison queue, throttling, correlation and other tools used by every message-driven application. Those come built-in with Watermill.

Let’s go back to the CQRS. As you already know, CQRS is built from multiple components, like Command or Event buses, handlers, processors, etc.To simplify creating all these building blocks, we created cqrs.Facade, which creates all of them.

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

  1. // ...
  2. func main() {
  3. logger := watermill.NewStdLogger(false, false)
  4. cqrsMarshaler := cqrs.ProtobufMarshaler{}
  5. // You can use any Pub/Sub implementation from here: https://watermill.io/docs/pub-sub-implementations/
  6. // Detailed RabbitMQ implementation: https://watermill.io/docs/pub-sub-implementations/#rabbitmq-amqp
  7. // Commands will be send to queue, because they need to be consumed once.
  8. commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
  9. commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
  10. if err != nil {
  11. panic(err)
  12. }
  13. commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
  14. if err != nil {
  15. panic(err)
  16. }
  17. // Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
  18. // (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
  19. eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
  20. if err != nil {
  21. panic(err)
  22. }
  23. // CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
  24. router, err := message.NewRouter(message.RouterConfig{}, logger)
  25. if err != nil {
  26. panic(err)
  27. }
  28. // Simple middleware which will recover panics from event or command handlers.
  29. // More about router middlewares you can find in the documentation:
  30. // https://watermill.io/docs/messages-router/#middleware
  31. //
  32. // List of available middlewares you can find in message/router/middleware.
  33. router.AddMiddleware(middleware.Recoverer)
  34. // cqrs.Facade is facade for Command and Event buses and processors.
  35. // You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
  36. cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
  37. GenerateCommandsTopic: func(commandName string) string {
  38. // we are using queue RabbitMQ config, so we need to have topic per command type
  39. return commandName
  40. },
  41. CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
  42. return []cqrs.CommandHandler{
  43. BookRoomHandler{eb},
  44. OrderBeerHandler{eb},
  45. }
  46. },
  47. CommandsPublisher: commandsPublisher,
  48. CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
  49. // we can reuse subscriber, because all commands have separated topics
  50. return commandsSubscriber, nil
  51. },
  52. GenerateEventsTopic: func(eventName string) string {
  53. // because we are using PubSub RabbitMQ config, we can use one topic for all events
  54. return "events"
  55. // we can also use topic per event type
  56. // return eventName
  57. },
  58. EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
  59. return []cqrs.EventHandler{
  60. OrderBeerOnRoomBooked{cb},
  61. NewBookingsFinancialReport(),
  62. }
  63. },
  64. EventsPublisher: eventsPublisher,
  65. EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
  66. config := amqp.NewDurablePubSubConfig(
  67. amqpAddress,
  68. amqp.GenerateQueueNameTopicNameWithSuffix(handlerName),
  69. )
  70. return amqp.NewSubscriber(config, logger)
  71. },
  72. Router: router,
  73. CommandEventMarshaler: cqrsMarshaler,
  74. Logger: logger,
  75. })
  76. if err != nil {
  77. panic(err)
  78. }
  79. // publish BookRoom commands every second to simulate incoming traffic
  80. go publishCommands(cqrsFacade.CommandBus())
  81. // processors are based on router, so they will work when router will start
  82. if err := router.Run(context.Background()); err != nil {
  83. panic(err)
  84. }
  85. }
  86. // ...

And that’s all. We have a working CQRS application.

What’s next?

As mentioned before, if you are not familiar with Watermill, we highly recommend reading Getting Started guide.