CQRS

The flow of the simplest CRUD applications can be described using the following steps:

  1. Controllers layer handle HTTP requests and delegate tasks to the services.
  2. Services layer is a place where the most of the business logic is being done.
  3. Services uses Repositories / DAOs to change / persist entities.
  4. Entities act as containers for the values, with setters and getters.

In most cases, there are no reasons to make small and medium-sized applications more complex. However, sometimes it’s not enough and when our needs become more sophisticated we want to have scalable systems with straightforward data flow.

Hence, we provide a lightweight CQRS module which elements are described below.

Commands

In order to make the application easier to understand, each change has to be preceded by Command. When any command is dispatched, the application has to react on it. Commands can be dispatched from the services (or directly from the controllers/gateways) and consumed in corresponding Command Handlers.

  1. @@filename(heroes-game.service)
  2. @Injectable()
  3. export class HeroesGameService {
  4. constructor(private readonly commandBus: CommandBus) {}
  5. async killDragon(heroId: string, killDragonDto: KillDragonDto) {
  6. return this.commandBus.execute(
  7. new KillDragonCommand(heroId, killDragonDto.dragonId)
  8. );
  9. }
  10. }
  11. @@switch
  12. @Injectable()
  13. @Dependencies(CommandBus)
  14. export class HeroesGameService {
  15. constructor(commandBus) {
  16. this.commandBus = commandBus;
  17. }
  18. async killDragon(heroId, killDragonDto) {
  19. return this.commandBus.execute(
  20. new KillDragonCommand(heroId, killDragonDto.dragonId)
  21. );
  22. }
  23. }

Here’s a sample service that dispatches KillDragonCommand. Let’s see how the command looks like:

  1. @@filename(kill-dragon.command)
  2. export class KillDragonCommand {
  3. constructor(
  4. public readonly heroId: string,
  5. public readonly dragonId: string,
  6. ) {}
  7. }
  8. @@switch
  9. export class KillDragonCommand {
  10. constructor(heroId, dragonId) {
  11. this.heroId = heroId;
  12. this.dragonId = dragonId;
  13. }
  14. }

The CommandBus is a commands stream. It delegates commands to the equivalent handlers. Each command has to have corresponding Command Handler:

  1. @@filename(kill-dragon.handler)
  2. @CommandHandler(KillDragonCommand)
  3. export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  4. constructor(private readonly repository: HeroRepository) {}
  5. async execute(command: KillDragonCommand) {
  6. const { heroId, dragonId } = command;
  7. const hero = this.repository.findOneById(+heroId);
  8. hero.killEnemy(dragonId);
  9. await this.repository.persist(hero);
  10. }
  11. }
  12. @@switch
  13. @CommandHandler(KillDragonCommand)
  14. @Dependencies(HeroRepository)
  15. export class KillDragonHandler {
  16. constructor(repository) {
  17. this.repository = repository;
  18. }
  19. async execute(command) {
  20. const { heroId, dragonId } = command;
  21. const hero = this.repository.findOneById(+heroId);
  22. hero.killEnemy(dragonId);
  23. await this.repository.persist(hero);
  24. }
  25. }

Now every application state change is a result of the Command occurrence. The logic is encapsulated in handlers. If we want, we can simply add logging here or even more, we can persist our commands in the database (e.g. for the diagnostics purposes).

Events

Since we have encapsulated commands in handlers, we prevent interaction between them - the application structure is still not flexible, not reactive. The solution is to use events.

  1. @@filename(hero-killed-dragon.event)
  2. export class HeroKilledDragonEvent {
  3. constructor(
  4. public readonly heroId: string,
  5. public readonly dragonId: string,
  6. ) {}
  7. }
  8. @@switch
  9. export class HeroKilledDragonEvent {
  10. constructor(heroId, dragonId) {
  11. this.heroId = heroId;
  12. this.dragonId = dragonId;
  13. }
  14. }

Events are asynchronous. They are dispatched either by models or directly using EventBus. In order to dispatch events, models have to extend the AggregateRoot class.

  1. @@filename(hero.model)
  2. export class Hero extends AggregateRoot {
  3. constructor(private readonly id: string) {
  4. super();
  5. }
  6. killEnemy(enemyId: string) {
  7. // logic
  8. this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  9. }
  10. }
  11. @@switch
  12. export class Hero extends AggregateRoot {
  13. constructor(id) {
  14. super();
  15. this.id = id;
  16. }
  17. killEnemy(enemyId) {
  18. // logic
  19. this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  20. }
  21. }

The apply() method does not dispatch events yet because there’s no relationship between model and the EventPublisher class. How to associate the model and the publisher? We need to use a publisher mergeObjectContext() method inside our command handler.

  1. @@filename(kill-dragon.handler)
  2. @CommandHandler(KillDragonCommand)
  3. export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  4. constructor(
  5. private readonly repository: HeroRepository,
  6. private readonly publisher: EventPublisher,
  7. ) {}
  8. async execute(command: KillDragonCommand) {
  9. const { heroId, dragonId } = command;
  10. const hero = this.publisher.mergeObjectContext(
  11. await this.repository.findOneById(+heroId),
  12. );
  13. hero.killEnemy(dragonId);
  14. hero.commit();
  15. }
  16. }
  17. @@switch
  18. @CommandHandler(KillDragonCommand)
  19. @Dependencies(HeroRepository, EventPublisher)
  20. export class KillDragonHandler {
  21. constructor(repository, publisher) {
  22. this.repository = repository;
  23. this.publisher = publisher;
  24. }
  25. async execute(command) {
  26. const { heroId, dragonId } = command;
  27. const hero = this.publisher.mergeObjectContext(
  28. await this.repository.findOneById(+heroId),
  29. );
  30. hero.killEnemy(dragonId);
  31. hero.commit();
  32. }
  33. }

Now everything works as expected. Notice that we need to commit() events since they’re not being dispatched immediately. Obviously, an object doesn’t have to exist upfront. We can easily merge type context as well:

  1. const HeroModel = this.publisher.mergeContext(Hero);
  2. new HeroModel('id');

That’s it. A model has an ability to publish events now. And we have to handle them. Additionally, we could emit events manually using EventBus:

  1. this.eventBus.publish(new HeroKilledDragonEvent());

info Hint The EventBus is an injectable class.

Each event can have multiple Event Handlers.

  1. @@filename(hero-killed-dragon.handler)
  2. @EventsHandler(HeroKilledDragonEvent)
  3. export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
  4. constructor(private readonly repository: HeroRepository) {}
  5. handle(event: HeroKilledDragonEvent) {
  6. // logic
  7. }
  8. }

Now we can move the write logic into the event handlers.

Sagas

This type of Event-Driven Architecture improves application reactiveness and scalability. Now, when we have events, we can simply react to them in various ways. The Sagas are the last building block from the architecture point of view.

The sagas are an incredibly powerful feature. Single saga may listen for 1..* events. It can combine, merge, filter […] events streams. RxJS library is the place where the magic comes from. In simple words, each saga has to return an Observable which contains a command. This command is dispatched asynchronously.

  1. @@filename(heroes-game.saga)
  2. @Injectable()
  3. export class HeroesGameSagas {
  4. @Saga()
  5. dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
  6. return events$.pipe(
  7. ofType(HeroKilledDragonEvent),
  8. map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
  9. );
  10. }
  11. }
  12. @@switch
  13. @Injectable()
  14. export class HeroesGameSagas {
  15. @Saga()
  16. dragonKilled = (events$) => {
  17. return events$.pipe(
  18. ofType(HeroKilledDragonEvent),
  19. map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
  20. );
  21. }
  22. }

info Hint The ofType operator is exported from the @nestjs/cqrs package.

We declared a rule - when any hero kills the dragon, the ancient item is being dropped. Afterwards, the DropAncientItemCommand will be dispatched and processed by the appropriate handler.

Queries

The CqrsModule might be also handy for queries processing. The QueryBus works the same as CommandsBus. Also, query handlers should implement the IQueryHandler interface and be marked with the @QueryHandler() decorator.

Setup

The last thing which we have to take care of is to set up the whole mechanism.

  1. @@filename(heroes-game.module)
  2. export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
  3. export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
  4. @Module({
  5. imports: [CqrsModule],
  6. controllers: [HeroesGameController],
  7. providers: [
  8. HeroesGameService,
  9. HeroesGameSagas,
  10. ...CommandHandlers,
  11. ...EventHandlers,
  12. HeroRepository,
  13. ]
  14. })
  15. export class HeroesGameModule {}

Summary

CommandBus, QueryBus and EventBus are Observables. It means that you can easily subscribe to the whole stream and enrich your application with Event Sourcing.

A working example is available here.